Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for 'maxTotalRows' to incremental publishing kafka indexing task and appenderator based realtime task #6129

Merged
merged 4 commits into from
Sep 7, 2018

Conversation

clintropolis
Copy link
Member

@clintropolis clintropolis commented Aug 8, 2018

Resolves #5898 by adding getMaxTotalRows and getMaxRowsPerSegment to AppenderatorConfig, extending the model used by IndexTask to IncrementalPublishingKafkaIndexTaskRunner and AppenderatorDriverRealtimeTask.

Additionally, tweaks maxRowsPerSegment behavior of kafka indexing to match the appenderator based realtime indexing change in #6125.

@@ -429,9 +429,6 @@ public void run()
// if stop is requested or task's end offset is set by call to setEndOffsets method with finish set to true
if (stopRequested.get() || sequences.get(sequences.size() - 1).isCheckpointed()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rationale behind breaking here if the last sequence is checkpointed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I discussed with @jihoonson because I noticed that previously it was setting the state to publishing and then going through a bunch of logic not related to publishing if stop wasn't user requested, and this is what was intended to happen here. As I understand, currently it will harmlessly but needlessly fall through the logic below before breaking out and publishing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Got it. Seems fine, since we haven't read anything from the Kafka consumer yet.

/**
* Maximum number of rows in memory before persisting to local storage
*
* @return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please get rid of the return if it's not going to have anything useful on it; here, and a few other places in the file.

long getMaxBytesInMemory();


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The extra newline here isn't needed.

* @return
*/
@Nullable
default Long getMaxTotalRows()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why null instead of Long.MAX_VALUE? That's more rows that one machine could possibly store anyway.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should never be used afaik, everything that uses this value gets it from json, this default is so I didn't have to add it to RealtimeTuningConfig which implements AppenderatorConfig.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If it's never going to be used, how about throwing UnsupportedOperationException and marking it non-nullable?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's nullable to be consistent with how IndexTask was using it. I don't have strong opinions about null vs Long.MAX_VALUE, I'll rework where this is used to get rid of nullable.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I didn't read the comment on that, it looks like IndexTask needs this to be nullable, will throw UnsupportedOperationException in default method at least.

@@ -611,6 +603,7 @@ public Object doCall() throws IOException
}
theSinks.put(identifier, sink);
sink.finishWriting();
totalRows.addAndGet(-sink.getNumRows());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would make more sense to decrement this at the same time as rowsCurrentlyInMemory, bytesCurrentlyInMemory, rather than here.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, where you are suggesting? Those values are decremented at persist time, this one is happening at publish time.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, yeah, you're right. Let me re-read this with non-dumb eyes.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, I re-read it, less dumbly this time, and it looks good to me. But consider the finishWriting-returning-boolean thing.

@@ -1118,16 +1094,18 @@ public String apply(SegmentIdentifier input)
final boolean removeOnDiskData
)
{
if (sink.isWritable()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the rationale behind moving this block?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For one, it seemed to be more legit to check that to match the we only count active sinks comment which implies writable to me. Additionally, since there are a couple of paths to decrementing the totalRows counter, I wanted to make sure I never double decremented it (it's other decrement is also tied to finishWriting).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, to prevent races (abandonSegment could be called in a separate thread) how about having sink.finishWriting() return true or false, corresponding to its writability state before the method was called? Then, only decrement the counters if it returns true.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

Copy link
Contributor

@gianm gianm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 after CI

@jihoonson
Copy link
Contributor

I'm reviewing this PR.

@@ -117,7 +117,8 @@ The tuningConfig is optional and default parameters will be used if no tuningCon
|`type`|String|The indexing task type, this should always be `kafka`.|yes|
|`maxRowsInMemory`|Integer|The number of rows to aggregate before persisting. This number is the post-aggregation rows, so it is not equivalent to the number of input events, but the number of aggregated rows that those events result in. This is used to manage the required JVM heap size. Maximum heap memory usage for indexing scales with maxRowsInMemory * (2 + maxPendingPersists). Normally user does not need to set this, but depending on the nature of data, if rows are short in terms of bytes, user may not want to store a million rows in memory and this value should be set.|no (default == 1000000)|
|`maxBytesInMemory`|Long|The number of bytes to aggregate in heap memory before persisting. This is based on a rough estimate of memory usage and not actual usage. Normally this is computed internally and user does not need to set it. The maximum heap memory usage for indexing is maxBytesInMemory * (2 + maxPendingPersists). |no (default == One-sixth of max JVM memory)|
|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)|
|`maxRowsPerSegment`|Integer|The number of rows to aggregate into a segment; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == 5000000)|
|`maxTotalRows`|Integer|The number of rows to aggregate across all segments; this number is post-aggregation rows. Handoff will happen either if `maxRowsPerSegment` or `maxTotalRows` is hit or every `intermediateHandoffPeriod`, whichever happens earlier.|no (default == unlimited)|
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -281,6 +282,7 @@ public KafkaIndexTaskTest(boolean isIncrementalHandoffSupported)
);
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this.

@@ -476,7 +478,7 @@ public void testIncrementalHandOff() throws Exception
}
final String baseSequenceName = "sequence0";
// as soon as any segment has more than one record, incremental publishing should happen
maxRowsPerSegment = 1;
maxRowsPerSegment = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@clintropolis would you tell me why this change is needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR also matches the behavior of #6125 which changes a '>' to a '>=' and whose logic was moved here in this PR. So to not modify the test a ton i upped the count (since with new behavior the test scenario would be pushing every row).

I updated the main description of the PR to reflect this, thanks for the reminder, I forgot 👍

@@ -40,6 +40,7 @@
private final int maxRowsInMemory;
private final long maxBytesInMemory;
private final int maxRowsPerSegment;
private final Long maxTotalRows;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add @Nullable.


@JsonProperty
@Override
public Long getMaxTotalRows()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nullable.

private final long maxBytesInMemory;
private final int maxRowsPerSegment;
private final Long maxTotalRows;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nullable

@JsonProperty
public int getMaxRowsPerSegment()
{
return maxRowsPerSegment;
}

@Override
@JsonProperty
public Long getMaxTotalRows()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Nullable

@@ -1485,6 +1485,7 @@ public long getMaxBytesInMemory()
}

@JsonProperty
@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add @Nullable here too.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done (but line changed is below this comment so github isn't squashing this one in the UI)

sink.finishWriting();
if (sink.finishWriting()) {
// Decrement this sink's rows from rowsCurrentlyInMemory (we only count active sinks).
rowsCurrentlyInMemory.addAndGet(-sink.getNumRowsInMemory());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, is this valid? It looks that this would not be called if this is executed first.

@clintropolis
Copy link
Member Author

@jihoonson do you have any additional comments? I believe I've addressed all existing review.

Copy link
Contributor

@jihoonson jihoonson left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. @clintropolis thanks!

@clintropolis clintropolis force-pushed the kafka-publish-total-rows branch from bcfd2d9 to e1af7cb Compare August 30, 2018 20:12
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants