-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
fix: Removes orphaned topics from transient queries #6714
fix: Removes orphaned topics from transient queries #6714
Conversation
There are some drawbacks with this approach:
I don't have any other alternatives off the top of my head, I'll give it some thought, but I think this merits further discussion. If we do decide to go down this path, might make sense to document this in a quick KLIP (I promise I'd review it promptly!). (EDIT: or at least get multiple eyes on this) |
That's pretty much what I had in mind. You effectively are delegating sticky id generation (and enforcing id uniqueness) to whatever configures ksql -- in a cloud environment, it could be Kubernetes. In a more static environment, it would just be a one-time generated id.
That's true. Given that this was being used for GCing, imperfect cleanup was considered acceptable, but I agree this isn't ideal. I was trying to keep this somewhat simple since it's about cleanup rather than an essential part of functionality.
That's fair. People might not fully understand the limitations of how I had intended for this to be used and could misuse it.
I agree, I think that's fair. Most of the alternatives I thought of for issuing/registering ids are a fair bit more complex. If you want to recreate this kind of sticky id:
Maybe predictable/sticky ids aren't required and generating long random ids is fine (including for restarts) and no consensus is required for that phase. Even if so, you probably need a coordinator for the cleanup phase to ensure that's done once. Of course, in order to clean things up, you have to decide that the owner of some id is dead or been restarted and a similar protocol must then exist for a coordinator marking that id as dead, waiting a bit, and then GCing the topic. We already do heartbeating for pull queries and could utilize that. I actually thought of using the advertised listener instead of |
Ideally, we'd have an alternative that doesn't involve ID generation at all. I think if we are going down that path, what you have here is the best for what we need. We can always make the ID generation more flexible (i.e. the suggestions you have) but we'll never be able to make it less - so it makes sense to start with static ID generation. What do you think of a stupid-simple approach of just committing a list of transient queries to disk? That way we can track them and on start-up we can just delete all the ones that we were handling. It still has the potential to leak if a node goes down and never comes up, but that was a problem in either approach. |
I agree that going the route of adding a per-node ID should have a KLIP. Adding this would be useful for more than just this type of garbage collection, but at the same time requires some care from an operator and has potential compatibility concerns. We should define: That said, if we do have to set an ID it's trivial to set a unique predictable ID in CCloud, so that's not a concern, at least for us. I do like the idea of keeping a registry of running queries on the local disk as an alternative to the above. It's expected that ksqlDB should have access to stable storage to function optimally, so we're not adding a new requirement here. |
I agree that defining such a thing would require answering a lot of the questions you stated above. I think that it's probably out of the scope of this GC case to try and tackle, since GC is much simpler potentially.
I considered this and I think it's a reasonable approach. There's one drawback to only using local disk and that's if the same logical node gets restarted on a new machine, then cleanup isn't possible since there will be nothing saved to local disk. Not sure how often that happens in various cloud environments, but it's certainly possible. That said, if we're aiming to take care of the common case and this covers it, then that's fine by me. |
+1 I think that's what we should be aiming for - like you said earlier, it's mostly to GC things so if we miss it a few times in exceptional cases that's alright, the user can always cleanup these topics on their own (and there were exceptional cases in both proposals). It also makes life easier by avoiding thinking about the hard(er) problems 😅 |
5d6f964
to
c4598b9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
overall lgtm, some inline comments
} | ||
// Find any transient query topics | ||
final Set<String> orphanedQueryApplicationIds = topicNames.stream() | ||
.map(topicName -> queryApplicationIds.stream().filter(topicName::startsWith).findFirst()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
should we be using startsWith
? I thought that in cloud we prefix all the topics with things other than just the transient application id. contains
might be a safer bet either way
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So far as I can tell, nothing is very different in the cloud beyond setting some configs differently for making the query application id. After all, so far as I can tell it's streams that is creating these topics and they just append to the query application id.
I verified this manually with the transient topic names, so I'm pretty confident startsWith should work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ack
final KafkaTopicClient topicClient = serviceContext.getTopicClient(); | ||
final Set<String> topicNames; | ||
try { | ||
topicNames = topicClient.listTopicNames(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if I recall, this is actually a somewhat expensive metadata operation (though we may want to confirm with Kafka experts) if we'll be calling it regularly. Should we flip the semantics (i.e. check whether a topic corresponding to an orphaned query exists) instead of the other way around?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In theory, it should process just one set of commands at a time during startup (it shouldn't be possible for there to be more than one unprocessed file). When it does that, it should call this just once. So it does this list call on node startup and uses that list of topics to compare with all of the application query ids in the commands file. It would seem to be more expensive to say for every queryApplicationId, check the existence of a given topic. Not only is that more rpcs, it also might check plenty of completed transient queries the cleaned up with no issue. I considered adding a "done cleaning" entry to the commands so I could find the set of only orphaned queries, but that seems potentially error prone as well and requires writing another entry. This seemed a little simpler.
Tell me if you'd prefer the "done cleaning" approach.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah I agree the existing PR approach is better. At first, I didn't realize it happens just once on startup but since that's the case this is easier and less error-prone.
return; | ||
} | ||
// Find any transient query topics | ||
final Set<String> orphanedQueryApplicationIds = topicNames.stream() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe I'm missing something, but shouldn't this filter out any that are currently running? otherwise won't we delete topics for queries that are running?
EDIT: I noticed that what's passed in is only ever the orphaned queryIDs. Can we javadoc this method to explain that?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, I think we're on the same page now. This is only ever run on startup and so by definition, the query application ids, if found as topic prefixes, would be orphaned. Of course, that list is all transient queries run, not technically just the orphaned ones.
I'll improve the documentation.
|
||
@Test | ||
public void shouldCleanup_allApplicationIds() { | ||
when(topicClient.listTopicNames()).thenReturn(ImmutableSet.of(TOPIC1, TOPIC2, TOPIC3)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we follow given/when/then convention? (it's basically already there but without the comments)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
* A command which is executed locally and possibly requiring cleanup next time the server is | ||
* restarted. | ||
*/ | ||
public class LocalCommand { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you think it makes sense to make this an interface with JsonSubTypes
so that we can handle a union of lots of different types of LocalCommands (see ExecutionStep
for example) or is that overkill?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, that's probably a little nicer. I'll introduce that. I was debating how general to make this since I don't know how many things will take advantage of it.
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/LocalCommands.java
Outdated
Show resolved
Hide resolved
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/LocalCommands.java
Outdated
Show resolved
Hide resolved
return new LocalCommands(directory, ksqlEngine, LocalCommandsFile.createWriteable(file)); | ||
} | ||
|
||
private void markFileAsProcessed(final File file) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not just delete the file? Do we every use these?
throw new IOException("Write permission denied."); | ||
} | ||
|
||
final byte[] bytes = MAPPER.writeValueAsBytes(localCommand); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just making sure, this writes it in UTF8 so it's human readable json, right? I think it would be nice to make these files human readble
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
.map(Optional::get) | ||
.collect(Collectors.toSet()); | ||
for (final String queryApplicationId : orphanedQueryApplicationIds) { | ||
cleanupService.addCleanupTask( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since the file is only relevant on a new startup, does it need to be added to the cleanup service or can we just have it do this cleanup on startup?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We chatted about this offline. This takes advantage of the other cleanup routines done by QueryCleanupService
, so needs access to it. It does this cleanup once on startup
Co-authored-by: Almog Gavra <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hitting the big green button
Co-authored-by: Almog Gavra <[email protected]>
Description
Fixes #4009
When a server is uncleanly shutdown or killed, it may leave orphaned topics from transient queries. This PR attempts to clean up these topic when the server starts up.
This is accomplished by writing transient query application ids to disk of the local node. This ensures that any topics are only attempted to be deleted by one node, the one that created it. The data isn't durable in that a node can shutdown and not restart, potentially leaving state uncleaned up.
Testing done
Describe the testing strategy. Unit and integration tests are expected for any behavior changes.
Reviewer checklist