Skip to content

Commit

Permalink
fix: address 1st PR feedback
Browse files Browse the repository at this point in the history
- New CommandTopicBackupNoOp to replace Optional parameter
- Refactor writeRecord() logic that checks if record was replayed before
- Minor fixes
  • Loading branch information
spena committed Jul 16, 2020
1 parent f91953f commit c4357c7
Show file tree
Hide file tree
Showing 9 changed files with 317 additions and 228 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -296,8 +296,7 @@ public class KsqlConfig extends AbstractConfig {
public static final Boolean KSQL_ENABLE_METASTORE_BACKUP_DEFAULT = false;
public static final String KSQL_ENABLE_METASTORE_BACKUP_DOC = "Enable the KSQL metastore "
+ "backup service. The backup replays the KSQL command_topic to a file located in the "
+ "same KSQL node. By default, the backup files are located in the private KSQL "
+ "directories.";
+ "same KSQL node.";

public static final String KSQL_METASTORE_BACKUP_LOCATION = "ksql.metastore.backup.location";
public static final String KSQL_METASTORE_BACKUP_LOCATION_DEFAULT = "";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,12 @@ public class CommandTopic {

private Consumer<CommandId, Command> commandConsumer = null;
private final String commandTopicName;
private Optional<CommandTopicBackup> commandTopicBackup;
private CommandTopicBackup commandTopicBackup;

public CommandTopic(
final String commandTopicName,
final Map<String, Object> kafkaConsumerProperties,
final Optional<CommandTopicBackup> commandTopicBackup
final CommandTopicBackup commandTopicBackup
) {
this(
commandTopicName,
Expand All @@ -62,7 +62,7 @@ public CommandTopic(
CommandTopic(
final String commandTopicName,
final Consumer<CommandId, Command> commandConsumer,
final Optional<CommandTopicBackup> commandTopicBackup
final CommandTopicBackup commandTopicBackup
) {
this.commandTopicPartition = new TopicPartition(commandTopicName, 0);
this.commandConsumer = Objects.requireNonNull(commandConsumer, "commandConsumer");
Expand All @@ -75,7 +75,7 @@ public String getCommandTopicName() {
}

public void start() {
commandTopicBackup.ifPresent(backup -> backup.initialize());
commandTopicBackup.initialize();
commandConsumer.assign(Collections.singleton(commandTopicPartition));
}

Expand Down Expand Up @@ -133,10 +133,10 @@ public void wakeup() {

public void close() {
commandConsumer.close();
commandTopicBackup.ifPresent(backup -> backup.close());
commandTopicBackup.close();
}

private void backupRecord(final ConsumerRecord<CommandId, Command> record) {
commandTopicBackup.ifPresent(backup -> backup.writeRecord(record));
commandTopicBackup.writeRecord(record);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,196 +15,14 @@

package io.confluent.ksql.rest.server;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Ticker;
import io.confluent.ksql.rest.entity.CommandId;
import io.confluent.ksql.rest.server.computation.Command;
import io.confluent.ksql.util.KsqlException;
import io.confluent.ksql.util.Pair;
import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.Collections;
import java.util.List;
import java.util.Objects;
import java.util.Optional;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Backup service that replays the KSQL command_topic to a local file. A new backup file is
* created whenever a new command does not match the actual backup file. Previously replayed
* messages read up to this new command will be written to the new file. This ensures a new
* complete backup of the command_topic is created.
*/
public class CommandTopicBackup {
private static final Logger LOG = LoggerFactory.getLogger(CommandTopicBackup.class);
private static final Ticker CURRENT_MILLIS_TICKER = new Ticker() {
@Override
public long read() {
return System.currentTimeMillis();
}
};
private static final String PREFIX = "backup_";

private final File backupLocation;
private final String topicName;
private final Ticker ticker;

private BackupReplayFile replayFile;
private List<Pair<CommandId, Command>> latestReplay;
private int latestReplayIdx;

public CommandTopicBackup(final String location, final String topicName) {
this(location, topicName, CURRENT_MILLIS_TICKER);
}

public CommandTopicBackup(final String location, final String topicName, final Ticker ticker) {
final File dir = new File(Objects.requireNonNull(location, "location"));
if (!dir.exists() || !dir.isDirectory()) {
throw new KsqlException(String.format(
"Backup location '%s' does not exist or it is not a directory.", location));
}

this.backupLocation = dir;
this.topicName = Objects.requireNonNull(topicName, "topicName");
this.ticker = Objects.requireNonNull(ticker, "ticker");
}

public void initialize() {
replayFile = openOrCreateReplayFile();

try {
latestReplay = replayFile.readRecords();
} catch (final IOException e) {
LOG.warn("Failed to read the latest backup from {}. Continue with a new file. Error = {}",
replayFile.getPath(), e.getMessage());

replayFile = newReplayFile();
latestReplay = Collections.emptyList();
}

latestReplayIdx = 0;
LOG.info("Command topic will be backup on file: {}", replayFile.getPath());
}

public void close() {
try {
replayFile.close();
} catch (final IOException e) {
LOG.warn("Failed closing the backup file {}. Error = {}",
replayFile.getPath(), e.getMessage());
}
}

@VisibleForTesting
BackupReplayFile getReplayFile() {
return replayFile;
}

public void writeRecord(final ConsumerRecord<CommandId, Command> record) {
if (latestReplayIdx < latestReplay.size()) {
final Pair<CommandId, Command> latestReplayRecord = latestReplay.get(latestReplayIdx);
if (record.key().equals(latestReplayRecord.left)
&& record.value().equals(latestReplayRecord.right)) {
// Ignore backup because record was already replayed
latestReplayIdx++;
return;
} else {
LOG.info("Previous command topic backup does not match the new command topic data. "
+ "A new backup file will be created.");
createNewBackupFile();
LOG.info("New backup file created: {}", replayFile.getPath());
}
} else if (latestReplayIdx > 0) {
// clear latest replay from memory
latestReplay.clear();
latestReplayIdx = 0;
}

try {
replayFile.write(record.key(), record.value());
} catch (final IOException e) {
LOG.warn("Failed to write to file {}. The command topic backup is not complete. "
+ "Make sure the file exists and has permissions to write. KSQL must be restarted "
+ "afterwards to complete the backup process. Error = {}",
replayFile.getPath(), e.getMessage());
}
}

private void createNewBackupFile() {
try {
replayFile.close();
} catch (IOException e) {
LOG.warn("Couldn't close the current backup file {}. Error = {}",
replayFile.getPath(), e.getMessage());
}

replayFile = newReplayFile();

if (latestReplay.size() > 0 && latestReplayIdx > 0) {
try {
replayFile.write(latestReplay.subList(0, latestReplayIdx));
} catch (final IOException e) {
LOG.warn("Couldn't write the latest replayed commands to the new backup file {}. "
+ "Make sure the file exists and has permissions to write. "
+ "KSQL must be restarted afterwards to complete the backup process. Error = {}",
replayFile.getPath(), e.getMessage());
}
}

// clear latest replay from memory
latestReplay.clear();
latestReplayIdx = 0;
}

@VisibleForTesting
BackupReplayFile openOrCreateReplayFile() {
final Optional<BackupReplayFile> latestFile = latestReplayFile();
if (latestFile.isPresent()) {
return latestFile.get();
}

return newReplayFile();
}

private BackupReplayFile newReplayFile() {
return new BackupReplayFile(Paths.get(
backupLocation.getAbsolutePath(),
String.format("%s%s_%s", PREFIX, topicName, ticker.read())
).toFile());
}

private Optional<BackupReplayFile> latestReplayFile() {
final String prefixFilename = String.format("%s%s_", PREFIX, topicName);
final File[] files = backupLocation.listFiles(
(f, name) -> name.toLowerCase().startsWith(prefixFilename));

File latestBakFile = null;
if (files != null) {
long latestTs = 0;
for (int i = 0; i < files.length; i++) {
final File bakFile = files[i];
final String bakTimestamp = bakFile.getName().substring(prefixFilename.length());
public interface CommandTopicBackup {
void initialize();

try {
final Long ts = Long.valueOf(bakTimestamp);
if (ts > latestTs) {
latestTs = ts;
latestBakFile = bakFile;
}
} catch (final NumberFormatException e) {
LOG.warn(
"Invalid timestamp '{}' found in backup replay file (file ignored): {}",
bakTimestamp, bakFile.getName());
continue;
}
}
}
void writeRecord(ConsumerRecord<CommandId, Command> record);

return (latestBakFile != null)
? Optional.of(new BackupReplayFile(latestBakFile))
: Optional.empty();
}
void close();
}
Loading

0 comments on commit c4357c7

Please sign in to comment.