Skip to content

Commit

Permalink
feat: metastore backups
Browse files Browse the repository at this point in the history
  • Loading branch information
spena committed Jul 15, 2020
1 parent f98f530 commit 0f50971
Show file tree
Hide file tree
Showing 10 changed files with 918 additions and 18 deletions.
26 changes: 26 additions & 0 deletions ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,18 @@ public class KsqlConfig extends AbstractConfig {
public static final String KSQL_CREATE_OR_REPLACE_ENABLED_DOC =
"Feature flag for CREATE OR REPLACE";

public static final String KSQL_ENABLE_METASTORE_BACKUP = "ksql.enable.metastore.backup";
public static final Boolean KSQL_ENABLE_METASTORE_BACKUP_DEFAULT = true;
public static final String KSQL_ENABLE_METASTORE_BACKUP_DOC = "Enable the KSQL metastore "
+ "backup service. The backup replays the KSQL command_topic to a file located in the "
+ "same KSQL node. By default, the backup files are located in the private KSQL "
+ "directories.";

public static final String KSQL_METASTORE_BACKUP_LOCATION = "ksql.metastore.backup.location";
public static final String KSQL_METASTORE_BACKUP_LOCATION_DEFAULT = "";
public static final String KSQL_METASTORE_BACKUP_LOCATION_DOC = "Specify the directory where "
+ "KSQL metastore backup files are located.";

private enum ConfigGeneration {
LEGACY,
CURRENT
Expand Down Expand Up @@ -666,6 +678,20 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) {
Importance.LOW,
KSQL_CREATE_OR_REPLACE_ENABLED_DOC
)
.define(
KSQL_ENABLE_METASTORE_BACKUP,
Type.BOOLEAN,
KSQL_ENABLE_METASTORE_BACKUP_DEFAULT,
Importance.LOW,
KSQL_ENABLE_METASTORE_BACKUP_DOC
)
.define(
KSQL_METASTORE_BACKUP_LOCATION,
Type.STRING,
KSQL_METASTORE_BACKUP_LOCATION_DEFAULT,
Importance.LOW,
KSQL_METASTORE_BACKUP_LOCATION_DOC
)
.withClientSslSupport();

for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.ksql.execution.json.PlanJsonMapper;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;

import java.io.BufferedWriter;
import java.io.Closeable;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStreamWriter;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.util.ArrayList;
import java.util.List;
import java.util.Objects;

/**
* A file that is used by the backup service to replay command_topic commands.
*/
public class BackupReplayFile implements Closeable {
private static final ObjectMapper MAPPER = PlanJsonMapper.INSTANCE.get();
private static final String KEY_VALUE_SEPARATOR = ":";

private final File file;
private final BufferedWriter writer;

public BackupReplayFile(final File file) {
this.file = Objects.requireNonNull(file, "file");
this.writer = createWriter(file);
}

private static BufferedWriter createWriter(final File file) {
try {
return new BufferedWriter(new OutputStreamWriter(
new FileOutputStream(file, true),
StandardCharsets.UTF_8)
);
} catch (final FileNotFoundException e) {
throw new KsqlException(
String.format("Failed to create replay file: %s", file.getAbsolutePath()), e);
}
}

public String getPath() {
return file.getAbsolutePath();
}

public void write(final CommandId commandId, final Command command) throws IOException {
writer.write(MAPPER.writeValueAsString(commandId));
writer.write(KEY_VALUE_SEPARATOR);
writer.write(MAPPER.writeValueAsString(command));
writer.write("\n");
writer.flush();
}

public void write(final List<Pair<CommandId, Command>> records) throws IOException {
for (final Pair<CommandId, Command> record : records) {
write(record.left, record.right);
}
}

public List<Pair<CommandId, Command>> readRecords() throws IOException {
final List<Pair<CommandId, Command>> commands = new ArrayList<>();
for (final String line : Files.readAllLines(file.toPath(), StandardCharsets.UTF_8)) {
final String commandId = line.substring(0, line.indexOf(KEY_VALUE_SEPARATOR));
final String command = line.substring(line.indexOf(KEY_VALUE_SEPARATOR) + 1);

commands.add(new Pair<>(
MAPPER.readValue(commandId.getBytes(StandardCharsets.UTF_8), CommandId.class),
MAPPER.readValue(command.getBytes(StandardCharsets.UTF_8), Command.class)
));
}

return commands;
}

@Override
public void close() throws IOException {
writer.close();
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018 Confluent Inc.
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
Expand Down Expand Up @@ -41,40 +41,52 @@ public class CommandTopic {

private Consumer<CommandId, Command> commandConsumer = null;
private final String commandTopicName;
private Optional<CommandTopicBackup> commandTopicBackup;

public CommandTopic(
final String commandTopicName,
final Map<String, Object> kafkaConsumerProperties
final Map<String, Object> kafkaConsumerProperties,
final Optional<CommandTopicBackup> commandTopicBackup
) {
this(
commandTopicName,
new KafkaConsumer<>(
Objects.requireNonNull(kafkaConsumerProperties, "kafkaClientProperties"),
InternalTopicSerdes.deserializer(CommandId.class),
InternalTopicSerdes.deserializer(Command.class)
)
),
commandTopicBackup
);
}

CommandTopic(
final String commandTopicName,
final Consumer<CommandId, Command> commandConsumer
final Consumer<CommandId, Command> commandConsumer,
final Optional<CommandTopicBackup> commandTopicBackup
) {
this.commandTopicPartition = new TopicPartition(commandTopicName, 0);
this.commandConsumer = Objects.requireNonNull(commandConsumer, "commandConsumer");
this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName");
this.commandTopicBackup = Objects.requireNonNull(commandTopicBackup, "commandTopicBackup");
}

public String getCommandTopicName() {
return commandTopicName;
}

public void start() {
commandTopicBackup.ifPresent(backup -> backup.initialize());
commandConsumer.assign(Collections.singleton(commandTopicPartition));
}

public Iterable<ConsumerRecord<CommandId, Command>> getNewCommands(final Duration timeout) {
return commandConsumer.poll(timeout);
final Iterable<ConsumerRecord<CommandId, Command>> iterable = commandConsumer.poll(timeout);

if (iterable != null) {
iterable.forEach(record -> backupRecord(record));
}

return iterable;
}

public List<QueuedCommand> getRestoreCommands(final Duration duration) {
Expand All @@ -89,6 +101,8 @@ public List<QueuedCommand> getRestoreCommands(final Duration duration) {
while (!records.isEmpty()) {
log.debug("Received {} records from poll", records.count());
for (final ConsumerRecord<CommandId, Command> record : records) {
backupRecord(record);

if (record.value() == null) {
continue;
}
Expand Down Expand Up @@ -119,5 +133,10 @@ public void wakeup() {

public void close() {
commandConsumer.close();
commandTopicBackup.ifPresent(backup -> backup.close());
}

private void backupRecord(final ConsumerRecord<CommandId, Command> record) {
commandTopicBackup.ifPresent(backup -> backup.writeRecord(record));
}
}
Loading

0 comments on commit 0f50971

Please sign in to comment.