-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: Replay command topic to local file to backup KSQL Metastore #5831
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Overall the code LGTM, all of my comments are code-level so I'm happy to give the green tick. I am interested to hear from @rodesai before we merge though about the implications of this in k8s environments (e.g. what happens if k8s decides to move the node? can we easily recover the backup file?)
|
||
public List<Pair<CommandId, Command>> readRecords() throws IOException { | ||
final List<Pair<CommandId, Command>> commands = new ArrayList<>(); | ||
for (final String line : Files.readAllLines(file.toPath(), StandardCharsets.UTF_8)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we test what happens if the command itself has a newline in it (or is it possible for the commandId to have the KEY_VALUE_SEPARATOR
in it - I don't think so)? I know it will make it harder to replay the file, but it might make it safer if we have an encoding: commandIdSize (4 bytes) | commandSize (4 byte) | commandId | Command
another (perhaps better) option is that because we know that the CommandID and the Command are valid JSON, we can just read one valid JSON then the next (see https://stackoverflow.com/a/37395419/2258040) and we don't even have to worry about newlines (which we can add anyway to make it easier for humans to read)
I might be too paranoid, let me know if you don't think this is a problem.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cool, I was looking on something like that. However, that format will not work with the plan of restoring the topic manually using this command:
$ kafka-topic --create --topic $COMMAND_TOPIC --partitions 1 --replication-factor 3
$ kafka-console-producer --broker-list localhost:9092 --topic $COMMAND_TOPIC \
--property "parse.key=true" --property "key.separator=:" < $BACKUP_FILE
If we have time, we could add a ksql-restore
command that reads the file using your proposal (which I prefer). Perhaps a next release? I'm not sure if we can do it in a compatible way. Just changing the file name might be enough probably.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the proposed restore command work if the command has newlines? If not, then we need to fix that right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rodesai There's a test that verifies that in BackupReplayFileTest.shouldWriteRecordWithNewLineCharacterInCommand
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, according to the standard it's not possible for a json string field to have an embedded newline, so we should be good here. I still think a more explicit format (like what @agavra suggested) is safer (even if it means the backup file isn't immediately usable). Up to you.
ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java
Outdated
Show resolved
Hide resolved
if (files != null) { | ||
long latestTs = 0; | ||
for (int i = 0; i < files.length; i++) { | ||
final File bakFile = files[i]; | ||
final String bakTimestamp = bakFile.getName().substring(prefixFilename.length()); | ||
|
||
try { | ||
final Long ts = Long.valueOf(bakTimestamp); | ||
if (ts > latestTs) { | ||
latestTs = ts; | ||
latestBakFile = bakFile; | ||
} | ||
} catch (final NumberFormatException e) { | ||
LOG.warn( | ||
"Invalid timestamp '{}' found in backup replay file (file ignored): {}", | ||
bakTimestamp, bakFile.getName()); | ||
continue; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
another code style thing which might make this a little easier to read:
File latestFile = Arrays.stream(files).max(Comparator.comparing(CommandTopicBackup::extractTimestamp));
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks nice!. But, how do I ignore a extractTimestamp
that failed? Say file is file_123xx
. I would need to extract the timestamp as String, then a filter that checks isNumeric, then a map to convert to Long, then a max(). Is there a better way to ignore it like your code?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good point... I'm not long certain this is any better than what you have, but one option is to make extractTimestamp
return Long.MIN_VALUE
if there's a failure and ensuring the one we got at the end is valid. Feel free to just close this comment :)
return replayFile; | ||
} | ||
|
||
public void writeRecord(final ConsumerRecord<CommandId, Command> record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
while the logic here is correct, it took me a while to understand what it was doing - it might make this code easier to read if you had a method isRestoring()
and then separated the restore logic into a checkIfRecordInLatestReplay
method
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
} | ||
} else if (latestReplayIdx > 0) { | ||
// clear latest replay from memory | ||
latestReplay.clear(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Collections.emptyList
will throw an exception if clear
gets called on it. While I don't think it's possible for this situation to be hit because latestReplayIdx
should never be greater than latestReplay.size()
we might want to change this to latestReaply.size() > 0
instead
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
final CommandId commandId = new CommandId( | ||
CommandId.Type.STREAM, streamName, CommandId.Action.CREATE); | ||
final Command command = new Command( | ||
String.format("CREATE STREAM %s (id INT) WITH (kafka_topic='%s", streamName, streamName), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's add a test where the command has the newline character (e.g. CREATE STREAM foo (id INT, "evil \n field" INT)...
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
|
||
@Before | ||
public void setup() { | ||
commandTopicBackup = new CommandTopicBackup( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can we add a test where the backup file is corrupted and we can't properly read it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@agavra typically in a k8s setup you'd run ksql in a statefulset, and each pod would have an associated storage volume. It's up to the administrator to configure ksql to write this backup to the storage volume (as is the case with state stores).
public static final Boolean KSQL_ENABLE_METASTORE_BACKUP_DEFAULT = false; | ||
public static final String KSQL_ENABLE_METASTORE_BACKUP_DOC = "Enable the KSQL metastore " | ||
+ "backup service. The backup replays the KSQL command_topic to a file located in the " | ||
+ "same KSQL node. By default, the backup files are located in the private KSQL " |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what's meant by private KSQL directories
here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I will remove that default. I had in mind a directory inside the directories created by the TGZ package, like /tmp/confluent.XXXX/ksql/backups
, but then I thought that is not the case for RPM/DEB packages and Cloud. So I removed the default, and forgot to remove the comment.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done. Removed comments about private directories.
@spena how would this backup get used? Are there plans to provide a tool for consuming and recovering the command topic from it? |
@rodesai The plan to make this a quick feature for 0.11 is to manually restore the command topic using the
I would prefer to have a That above command will be part of the docs. |
ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupReplayFileTest.java
Show resolved
Hide resolved
- New CommandTopicBackupNoOp to replace Optional parameter - Refactor writeRecord() logic that checks if record was replayed before - Minor fixes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Code LGTM
|
||
public List<Pair<CommandId, Command>> readRecords() throws IOException { | ||
final List<Pair<CommandId, Command>> commands = new ArrayList<>(); | ||
for (final String line : Files.readAllLines(file.toPath(), StandardCharsets.UTF_8)) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As discussed offline, according to the standard it's not possible for a json string field to have an embedded newline, so we should be good here. I still think a more explicit format (like what @agavra suggested) is safer (even if it means the backup file isn't immediately usable). Up to you.
Thanks, @rodesai I will merge the PR. Things should work temporarily at least. I will add other improvements to this to have a restore command and a better file format. The backup is re-created on every restart, so if someone upgrades to the next release, they will get a new fresh backup with the new format. |
Description
Implements KLIP-31: Metastore Backups - #5741
Add a metastore backup service that replays the command_topic to a local file. This local file will be useful to users who desire to restore their command_topic in case of accidental topic disasters.
Two new configs are added to enable backups (see
KsqlConfig
):ksql.enable.metastore.backup
to enable the metastore backupksql.metastore.backup.location
to specify the location of the backupTwo new classes are created:
CommandTopicBackup
BackupReplayFile
The
CommandTopicBackup
is the service used to backup command topic records through thewriteRecord
method. It is called by theCommandTopic
class on each record read by the Kafka consumer (seegetRestoreCommands
andgetNewCommands
from theCommandTopic
). TheCommandTopicBackup
class creates a new backup file (if none exists) or opens a current backup file used in a previous ksqlDB restart to update it with only new commands. It may also create a new backup file in case the record to write does not match the previous ksqlDB replayed records. This is how it handles command_topic deletion accidents.The
BackupReplayFile
is used to read/write records to a local file, and used by theCommandTopicBackup
. Each record is serialized/deserialized using the JSON serde, which is the same used by the command_topic. The record written to the file contains only the CommandId (key) and Command (value) separated by a colon. This formatted data in the file allows users to easily restore their command_topic by executing the following Kafka command:Other classes modified are:
CommandTopic
. Calls theCommandTopicBackup
on each record read by the command topic consumer.CommandStore
. Creates theCommandTopicBackup
if enabled, and passes it to theCommandTopic
class.Testing done
show streams
[OK]Reviewer checklist