-
Notifications
You must be signed in to change notification settings - Fork 14.2k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KAFKA-12464: follow up PR to refactor codes and add logs #10645
Conversation
// the expected number of members with over minQuota assignment | ||
int expectedNumMembersAssignedOverMinQuota = totalPartitionsCount % numberOfConsumers; | ||
// the number of members with over minQuota partitions assigned | ||
int numMembersAssignedOverMinQuota = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a nit -- and to clarify up front, if you agree with this let's still hold off on doing it here so this PR can finally be merged, as I figure any nits can be addressed in your general assign PR:
It's still a bit unclear what this value will be sued for when you first see it, maybe we can work in the word minQuota somewhere in the name? Eg expectedNumMembersWithMoreThanMinQuotaPartitions, or for a slightly shorter example numConsumersAssignedOverMinQuota, or something between or similar to those
FYI I'm also ok with it as-is if you prefer the current name -- just wanted to throw out some other suggestions. I'll trust you to pick whatever name feels right
Make sense! I choose expectedNumMembersAssignedOverMinQuota
and numMembersAssignedOverMinQuota
. :)
// this consumer is potential maxQuota candidate since we're still under the number of expected members | ||
// with more than the minQuota partitions. Note, if the number of expected members with more than | ||
// the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers | ||
if (numMembersAssignedOverMinQuota < expectedNumMembersAssignedOverMinQuota) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit (again, please address this in the other PR so I can merge this one): as Guozhang pointed out in another comment, in the case minQuota == maxQuota, this comment is a bit misleading as the number of expected max capacity members is technically all of them, but the variable expectedNumMembersHavingMorePartitions refers to the number of members who have more than the minQuota number of partitions, which in that case would actually be zero.
Agree! I refer to your suggested change except this consumer may be assigned one more partition
(explain below), and add this line:
Note, if the number of expected members with more than the minQuota partitions is 0, it means minQuota == maxQuota, so they won't be put into unfilledMembers
This should make it more clear.
Just a thought: technically it's not even a "potential maxQuota" member, since as you pointed out in another comment "the unassignedPartitions size will always >= unfilledMembers size" -- therefore anything in unfilledMembers will in fact need to receive at least one partition. Does that sound right to you? (this is just a followup question to make sure we're on the same page, no need to do anything for this one)
Not excatly. After what we've changed to add potential maxQuota members into unfilledMembers, the unassignedPartitions size will not always >= unfilledMembers size. There will be cases that the unfilledMembers won't get any additional partition. Here's the example (also in new added tests)
2 topics: t1, t2
total partitions: t1p0, t1p1, t2p0, t2p1, t2p2 ==> 5 partitions
current assignment for c1, c2: (the partition t2p0 was assigning to c3, but c3 dropped)
c1: t1p0, t2p1
c2: t1p1, t2p2
In this situation, the maxQuota is 3, minQuota is 2, expectedNumMembersAssignedOverMinQuota is 1. so, after 1st reassign previously owned partitions phase, the numMembersAssignedOverMinQuota
is still 0, unfilledMembers
will be [c1, c2], and unassignedPartitions will be [t2p0]. After 2nd phase, only c1 will get 1 partition assigned.
@@ -242,6 +244,9 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, | |||
if (unfilledMembers.isEmpty()) { | |||
// Should not enter here since we have calculated the exact number to assign to each consumer | |||
// There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners. | |||
int currentPartitionIndex = unassignedPartitions.indexOf(unassignedPartition); | |||
log.error("No more unfilled consumers to be assigned. The remaining unassigned partitions are: {}", | |||
unassignedPartitions.subList(currentPartitionIndex, unassignedPartitions.size())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit (for next PR): can you log an error before throwing the exception and include the set of unassigned partitions? Either just print out the unassignedPartitions along with the current partition being processed so you can figure out which partitions are remaining after that, or else by actually computing the remaining partitions that have yet to be assigned. Since it's an error case, I think it's ok to spend a little extra time computing that for better debuggability
Make sense. Added!
log.error("Current number of members with more than the minQuota partitions: {}, is less than the expected number " + | ||
"of members with more than the minQuota partitions: {}, and no more partitions to be assigned to the remaining unfilled consumers: {}", | ||
numMembersAssignedOverMinQuota, expectedNumMembersAssignedOverMinQuota, unfilledMembers); | ||
throw new IllegalStateException("We haven't reached the expected number of members with " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same here, can you log an error with the remaining unfilledMembers? I know you already do that in the exception message, but imo it would be better to print in a log message instead of an exception, as it may be long
I included most info in the error log, and just put simple error message in exception.
"and no more partitions to be assigned", unfilledMember, minQuota, assignedPartitionsCount)); | ||
log.error("Consumer: [{}] should have {} partitions, but got {} partitions, and no more partitions " + | ||
"to be assigned. The remaining unfilled consumers are: {}", unfilledMember, minQuota, assignedPartitionsCount, unfilledMembers); | ||
throw new IllegalStateException(String.format("Consumer: [%s] doesn't reach minQuota partitions, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the exception here looks good, but once again let's also log an error (it just makes it easier to debug when you have something concrete in the place you encountered the error, whereas exceptions are not always printed right away). Should probably just log any info that could be useful, such as all remaining unfilledMembers
I included most info in the error log, and just put simple error message in exception.
df4b59e
to
34e8806
Compare
"and no more partitions to be assigned", unfilledMember)); | ||
} else { | ||
log.trace("skip over this unfilled member: [{}] because we've reached the expected number of " + | ||
"members with more than the minQuota partitions, and this member already have minQuota partitions", unfilledMember); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we add an else case that just logs that we skipped over this member because we reached max capacity and it was still at min? Not sure if debug or trace is more appropriate, might be worth just running the tests with this log in place to see how often it gets printed
I put in trace
level. After running all tests, there are 6 out of 27 tests will print this log.
@ableegoldman , I've addressed all your comments in #10509. Please take a look. Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM, thanks for following up
Just some unrelated test failures: |
…e-allocations-lz4 * apache-github/trunk: (155 commits) KAFKA-12728: Upgrade gradle to 7.0.2 and shadow to 7.0.0 (apache#10606) KAFKA-12778: Fix QuorumController request timeouts and electLeaders (apache#10688) KAFKA-12754: Improve endOffsets for TaskMetadata (apache#10634) Rework on KAFKA-3968: fsync the parent directory of a segment file when the file is created (apache#10680) MINOR: set replication.factor to 1 to make StreamsBrokerCompatibilityService work with old broker (apache#10673) MINOR: prevent cleanup() from being called while Streams is still shutting down (apache#10666) KAFKA-8326: Introduce List Serde (apache#6592) KAFKA-12697: Add Global Topic and Partition count metrics to the Quorum Controller (apache#10679) KAFKA-12648: MINOR - Add TopologyMetadata.Subtopology class for subtopology metadata (apache#10676) MINOR: Update jacoco to 0.8.7 for JDK 16 support (apache#10654) MINOR: exclude all `src/generated` and `src/generated-test` (apache#10671) KAFKA-12772: Move all transaction state transition rules into their states (apache#10667) KAFKA-12758 Added `server-common` module to have server side common classes. (apache#10638) MINOR Removed copying storage libraries specifically as they are already copied. (apache#10647) KAFKA-5876: KIP-216 Part 4, Apply InvalidStateStorePartitionException for Interactive Queries (apache#10657) KAFKA-12747: Fix flakiness in shouldReturnUUIDsWithStringPrefix (apache#10643) MINOR: remove unnecessary placeholder from WorkerSourceTask#recordSent (apache#10659) MINOR: Remove unused `scalatest` definition from `dependencies.gradle` (apache#10655) MINOR: checkstyle version upgrade: 8.20 -> 8.36.2 (apache#10656) KAFKA-12464: minor code cleanup and additional logging in constrained sticky assignment (apache#10645) ...
This is the follow up PR to address the remaining comments in #10509.
Committer Checklist (excluded from commit message)