Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Replay command topic to local file to backup KSQL Metastore #5831

Merged
merged 2 commits into from
Jul 16, 2020

Conversation

spena
Copy link
Member

@spena spena commented Jul 15, 2020

Description

Implements KLIP-31: Metastore Backups - #5741

Add a metastore backup service that replays the command_topic to a local file. This local file will be useful to users who desire to restore their command_topic in case of accidental topic disasters.

Two new configs are added to enable backups (see KsqlConfig):

  • ksql.enable.metastore.backup to enable the metastore backup
  • ksql.metastore.backup.location to specify the location of the backup

Two new classes are created:

  • CommandTopicBackup
  • BackupReplayFile

The CommandTopicBackup is the service used to backup command topic records through the writeRecord method. It is called by the CommandTopic class on each record read by the Kafka consumer (see getRestoreCommands and getNewCommands from the CommandTopic). The CommandTopicBackup class creates a new backup file (if none exists) or opens a current backup file used in a previous ksqlDB restart to update it with only new commands. It may also create a new backup file in case the record to write does not match the previous ksqlDB replayed records. This is how it handles command_topic deletion accidents.

The BackupReplayFile is used to read/write records to a local file, and used by the CommandTopicBackup. Each record is serialized/deserialized using the JSON serde, which is the same used by the command_topic. The record written to the file contains only the CommandId (key) and Command (value) separated by a colon. This formatted data in the file allows users to easily restore their command_topic by executing the following Kafka command:

$ kafka-console-producer --broker-list localhost:9092 --topic $COMMAND_TOPIC --property "parse.key=true" --property "key.separator=:" < $BACKUP_FILE

Other classes modified are:

  • CommandTopic. Calls the CommandTopicBackup on each record read by the command topic consumer.
  • CommandStore. Creates the CommandTopicBackup if enabled, and passes it to the CommandTopic class.

Testing done

  • Added unit tests on the new and modified classes
  • Verified manually by execution the following scenarios:
    1. Create 1 stream, stop ksqlDB, delete/restore the command topic, start ksqlDB, run show streams [OK]
    2. Create 1 stream, stop ksqlDB, delete the backup file, start ksqlDB, check the backup file is created again [OK]
    3. Create 2 streams, stop ksqlDB, delete 2nd stream from backup, start ksqlDB, check only 2nd stream is added back [OK]
    4. Create 2 streams, stop ksqlDB, delete 1st stream from backup, start ksqlDB, check a new backup is created [OK]

Reviewer checklist

  • Ensure docs are updated if necessary. (eg. if a user visible feature is being added or changed).
  • Ensure relevant issues are linked (description should include text like "Fixes #")

@spena spena requested review from agavra and a team July 15, 2020 15:02
@spena spena force-pushed the metastore_backup branch from 0f50971 to 8e1da4d Compare July 15, 2020 16:23
@spena spena force-pushed the metastore_backup branch from 8e1da4d to 8c8b54c Compare July 15, 2020 17:52
Copy link
Contributor

@agavra agavra left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall the code LGTM, all of my comments are code-level so I'm happy to give the green tick. I am interested to hear from @rodesai before we merge though about the implications of this in k8s environments (e.g. what happens if k8s decides to move the node? can we easily recover the backup file?)


public List<Pair<CommandId, Command>> readRecords() throws IOException {
final List<Pair<CommandId, Command>> commands = new ArrayList<>();
for (final String line : Files.readAllLines(file.toPath(), StandardCharsets.UTF_8)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we test what happens if the command itself has a newline in it (or is it possible for the commandId to have the KEY_VALUE_SEPARATOR in it - I don't think so)? I know it will make it harder to replay the file, but it might make it safer if we have an encoding: commandIdSize (4 bytes) | commandSize (4 byte) | commandId | Command

another (perhaps better) option is that because we know that the CommandID and the Command are valid JSON, we can just read one valid JSON then the next (see https://stackoverflow.com/a/37395419/2258040) and we don't even have to worry about newlines (which we can add anyway to make it easier for humans to read)

I might be too paranoid, let me know if you don't think this is a problem.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cool, I was looking on something like that. However, that format will not work with the plan of restoring the topic manually using this command:

$ kafka-topic --create --topic $COMMAND_TOPIC --partitions 1 --replication-factor 3
$ kafka-console-producer --broker-list localhost:9092 --topic $COMMAND_TOPIC \
        --property "parse.key=true" --property "key.separator=:" < $BACKUP_FILE

If we have time, we could add a ksql-restore command that reads the file using your proposal (which I prefer). Perhaps a next release? I'm not sure if we can do it in a compatible way. Just changing the file name might be enough probably.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the proposed restore command work if the command has newlines? If not, then we need to fix that right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@rodesai There's a test that verifies that in BackupReplayFileTest.shouldWriteRecordWithNewLineCharacterInCommand.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, according to the standard it's not possible for a json string field to have an embedded newline, so we should be good here. I still think a more explicit format (like what @agavra suggested) is safer (even if it means the backup file isn't immediately usable). Up to you.

Comment on lines 185 to 204
if (files != null) {
long latestTs = 0;
for (int i = 0; i < files.length; i++) {
final File bakFile = files[i];
final String bakTimestamp = bakFile.getName().substring(prefixFilename.length());

try {
final Long ts = Long.valueOf(bakTimestamp);
if (ts > latestTs) {
latestTs = ts;
latestBakFile = bakFile;
}
} catch (final NumberFormatException e) {
LOG.warn(
"Invalid timestamp '{}' found in backup replay file (file ignored): {}",
bakTimestamp, bakFile.getName());
continue;
}
}
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

another code style thing which might make this a little easier to read:

File latestFile = Arrays.stream(files).max(Comparator.comparing(CommandTopicBackup::extractTimestamp));

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks nice!. But, how do I ignore a extractTimestamp that failed? Say file is file_123xx. I would need to extract the timestamp as String, then a filter that checks isNumeric, then a map to convert to Long, then a max(). Is there a better way to ignore it like your code?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good point... I'm not long certain this is any better than what you have, but one option is to make extractTimestamp return Long.MIN_VALUE if there's a failure and ensuring the one we got at the end is valid. Feel free to just close this comment :)

return replayFile;
}

public void writeRecord(final ConsumerRecord<CommandId, Command> record) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

while the logic here is correct, it took me a while to understand what it was doing - it might make this code easier to read if you had a method isRestoring() and then separated the restore logic into a checkIfRecordInLatestReplay method

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

}
} else if (latestReplayIdx > 0) {
// clear latest replay from memory
latestReplay.clear();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Collections.emptyList will throw an exception if clear gets called on it. While I don't think it's possible for this situation to be hit because latestReplayIdx should never be greater than latestReplay.size() we might want to change this to latestReaply.size() > 0 instead

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

final CommandId commandId = new CommandId(
CommandId.Type.STREAM, streamName, CommandId.Action.CREATE);
final Command command = new Command(
String.format("CREATE STREAM %s (id INT) WITH (kafka_topic='%s", streamName, streamName),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's add a test where the command has the newline character (e.g. CREATE STREAM foo (id INT, "evil \n field" INT)...)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


@Before
public void setup() {
commandTopicBackup = new CommandTopicBackup(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add a test where the backup file is corrupted and we can't properly read it?

Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@agavra typically in a k8s setup you'd run ksql in a statefulset, and each pod would have an associated storage volume. It's up to the administrator to configure ksql to write this backup to the storage volume (as is the case with state stores).

public static final Boolean KSQL_ENABLE_METASTORE_BACKUP_DEFAULT = false;
public static final String KSQL_ENABLE_METASTORE_BACKUP_DOC = "Enable the KSQL metastore "
+ "backup service. The backup replays the KSQL command_topic to a file located in the "
+ "same KSQL node. By default, the backup files are located in the private KSQL "
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's meant by private KSQL directories here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will remove that default. I had in mind a directory inside the directories created by the TGZ package, like /tmp/confluent.XXXX/ksql/backups, but then I thought that is not the case for RPM/DEB packages and Cloud. So I removed the default, and forgot to remove the comment.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Removed comments about private directories.

@rodesai
Copy link
Contributor

rodesai commented Jul 16, 2020

@spena how would this backup get used? Are there plans to provide a tool for consuming and recovering the command topic from it?

@spena
Copy link
Member Author

spena commented Jul 16, 2020

@rodesai The plan to make this a quick feature for 0.11 is to manually restore the command topic using the kafka-console-producer + the backup file. Users will run something like this:

$ kafka-topic --create --topic $COMMAND_TOPIC --partitions 1 --replication-factor 3
$ kafka-console-producer --broker-list localhost:9092 --topic $COMMAND_TOPIC \
        --property "parse.key=true" --property "key.separator=:" < $BACKUP_FILE

I would prefer to have a ksql-restore command or something to have a clean restore, but the above commands work as a temporary solution.

That above command will be part of the docs.

@spena spena force-pushed the metastore_backup branch from fd37c7e to c4357c7 Compare July 16, 2020 15:43
- New CommandTopicBackupNoOp to replace Optional parameter
- Refactor writeRecord() logic that checks if record was replayed before
- Minor fixes
@spena spena force-pushed the metastore_backup branch from c4357c7 to b4b7dab Compare July 16, 2020 16:14
Copy link
Contributor

@rodesai rodesai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code LGTM


public List<Pair<CommandId, Command>> readRecords() throws IOException {
final List<Pair<CommandId, Command>> commands = new ArrayList<>();
for (final String line : Files.readAllLines(file.toPath(), StandardCharsets.UTF_8)) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed offline, according to the standard it's not possible for a json string field to have an embedded newline, so we should be good here. I still think a more explicit format (like what @agavra suggested) is safer (even if it means the backup file isn't immediately usable). Up to you.

@spena
Copy link
Member Author

spena commented Jul 16, 2020

Thanks, @rodesai I will merge the PR. Things should work temporarily at least. I will add other improvements to this to have a restore command and a better file format. The backup is re-created on every restart, so if someone upgrades to the next release, they will get a new fresh backup with the new format.

@spena spena merged commit 8523051 into confluentinc:master Jul 16, 2020
@spena spena deleted the metastore_backup branch July 16, 2020 21:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants