diff --git a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java index a523826ab9ac..79c4ef5464ea 100644 --- a/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java +++ b/ksqldb-common/src/main/java/io/confluent/ksql/util/KsqlConfig.java @@ -292,6 +292,17 @@ public class KsqlConfig extends AbstractConfig { public static final String KSQL_CREATE_OR_REPLACE_ENABLED_DOC = "Feature flag for CREATE OR REPLACE"; + public static final String KSQL_ENABLE_METASTORE_BACKUP = "ksql.enable.metastore.backup"; + public static final Boolean KSQL_ENABLE_METASTORE_BACKUP_DEFAULT = false; + public static final String KSQL_ENABLE_METASTORE_BACKUP_DOC = "Enable the KSQL metastore " + + "backup service. The backup replays the KSQL command_topic to a file located in the " + + "same KSQL node."; + + public static final String KSQL_METASTORE_BACKUP_LOCATION = "ksql.metastore.backup.location"; + public static final String KSQL_METASTORE_BACKUP_LOCATION_DEFAULT = ""; + public static final String KSQL_METASTORE_BACKUP_LOCATION_DOC = "Specify the directory where " + + "KSQL metastore backup files are located."; + private enum ConfigGeneration { LEGACY, CURRENT @@ -679,6 +690,20 @@ private static ConfigDef buildConfigDef(final ConfigGeneration generation) { Importance.LOW, KSQL_CREATE_OR_REPLACE_ENABLED_DOC ) + .define( + KSQL_ENABLE_METASTORE_BACKUP, + Type.BOOLEAN, + KSQL_ENABLE_METASTORE_BACKUP_DEFAULT, + Importance.LOW, + KSQL_ENABLE_METASTORE_BACKUP_DOC + ) + .define( + KSQL_METASTORE_BACKUP_LOCATION, + Type.STRING, + KSQL_METASTORE_BACKUP_LOCATION_DEFAULT, + Importance.LOW, + KSQL_METASTORE_BACKUP_LOCATION_DOC + ) .withClientSslSupport(); for (final CompatibilityBreakingConfigDef compatibilityBreakingConfigDef diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java new file mode 100644 index 000000000000..49d7b64829e7 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/BackupReplayFile.java @@ -0,0 +1,102 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import com.fasterxml.jackson.databind.ObjectMapper; +import io.confluent.ksql.execution.json.PlanJsonMapper; +import io.confluent.ksql.rest.entity.CommandId; +import io.confluent.ksql.rest.server.computation.Command; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.Pair; + +import java.io.BufferedWriter; +import java.io.Closeable; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileOutputStream; +import java.io.IOException; +import java.io.OutputStreamWriter; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; + +/** + * A file that is used by the backup service to replay command_topic commands. + */ +public class BackupReplayFile implements Closeable { + private static final ObjectMapper MAPPER = PlanJsonMapper.INSTANCE.get(); + private static final String KEY_VALUE_SEPARATOR = ":"; + + private final File file; + private final BufferedWriter writer; + + public BackupReplayFile(final File file) { + this.file = Objects.requireNonNull(file, "file"); + this.writer = createWriter(file); + } + + private static BufferedWriter createWriter(final File file) { + try { + return new BufferedWriter(new OutputStreamWriter( + new FileOutputStream(file, true), + StandardCharsets.UTF_8) + ); + } catch (final FileNotFoundException e) { + throw new KsqlException( + String.format("Failed to create replay file: %s", file.getAbsolutePath()), e); + } + } + + public String getPath() { + return file.getAbsolutePath(); + } + + public void write(final CommandId commandId, final Command command) throws IOException { + writer.write(MAPPER.writeValueAsString(commandId)); + writer.write(KEY_VALUE_SEPARATOR); + writer.write(MAPPER.writeValueAsString(command)); + writer.write("\n"); + writer.flush(); + } + + public void write(final List> records) throws IOException { + for (final Pair record : records) { + write(record.left, record.right); + } + } + + public List> readRecords() throws IOException { + final List> commands = new ArrayList<>(); + for (final String line : Files.readAllLines(file.toPath(), StandardCharsets.UTF_8)) { + final String commandId = line.substring(0, line.indexOf(KEY_VALUE_SEPARATOR)); + final String command = line.substring(line.indexOf(KEY_VALUE_SEPARATOR) + 1); + + commands.add(new Pair<>( + MAPPER.readValue(commandId.getBytes(StandardCharsets.UTF_8), CommandId.class), + MAPPER.readValue(command.getBytes(StandardCharsets.UTF_8), Command.class) + )); + } + + return commands; + } + + @Override + public void close() throws IOException { + writer.close(); + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java index 160f3890bb86..d554bd585b1e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopic.java @@ -1,5 +1,5 @@ /* - * Copyright 2018 Confluent Inc. + * Copyright 2020 Confluent Inc. * * Licensed under the Confluent Community License (the "License"); you may not use * this file except in compliance with the License. You may obtain a copy of the @@ -41,10 +41,12 @@ public class CommandTopic { private Consumer commandConsumer = null; private final String commandTopicName; + private CommandTopicBackup commandTopicBackup; public CommandTopic( final String commandTopicName, - final Map kafkaConsumerProperties + final Map kafkaConsumerProperties, + final CommandTopicBackup commandTopicBackup ) { this( commandTopicName, @@ -52,17 +54,20 @@ public CommandTopic( Objects.requireNonNull(kafkaConsumerProperties, "kafkaClientProperties"), InternalTopicSerdes.deserializer(CommandId.class), InternalTopicSerdes.deserializer(Command.class) - ) + ), + commandTopicBackup ); } CommandTopic( final String commandTopicName, - final Consumer commandConsumer + final Consumer commandConsumer, + final CommandTopicBackup commandTopicBackup ) { this.commandTopicPartition = new TopicPartition(commandTopicName, 0); this.commandConsumer = Objects.requireNonNull(commandConsumer, "commandConsumer"); this.commandTopicName = Objects.requireNonNull(commandTopicName, "commandTopicName"); + this.commandTopicBackup = Objects.requireNonNull(commandTopicBackup, "commandTopicBackup"); } public String getCommandTopicName() { @@ -70,11 +75,18 @@ public String getCommandTopicName() { } public void start() { + commandTopicBackup.initialize(); commandConsumer.assign(Collections.singleton(commandTopicPartition)); } public Iterable> getNewCommands(final Duration timeout) { - return commandConsumer.poll(timeout); + final Iterable> iterable = commandConsumer.poll(timeout); + + if (iterable != null) { + iterable.forEach(record -> backupRecord(record)); + } + + return iterable; } public List getRestoreCommands(final Duration duration) { @@ -89,6 +101,8 @@ public List getRestoreCommands(final Duration duration) { while (!records.isEmpty()) { log.debug("Received {} records from poll", records.count()); for (final ConsumerRecord record : records) { + backupRecord(record); + if (record.value() == null) { continue; } @@ -119,5 +133,10 @@ public void wakeup() { public void close() { commandConsumer.close(); + commandTopicBackup.close(); + } + + private void backupRecord(final ConsumerRecord record) { + commandTopicBackup.writeRecord(record); } } diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackup.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackup.java new file mode 100644 index 000000000000..1c5aa811a884 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackup.java @@ -0,0 +1,28 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import io.confluent.ksql.rest.entity.CommandId; +import io.confluent.ksql.rest.server.computation.Command; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public interface CommandTopicBackup { + void initialize(); + + void writeRecord(ConsumerRecord record); + + void close(); +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java new file mode 100644 index 000000000000..2a1e1aff9332 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupImpl.java @@ -0,0 +1,225 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Ticker; +import io.confluent.ksql.rest.entity.CommandId; +import io.confluent.ksql.rest.server.computation.Command; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.Pair; +import java.io.File; +import java.io.IOException; +import java.nio.file.Paths; +import java.util.Collections; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Backup service that replays the KSQL command_topic to a local file. A new backup file is + * created whenever a new command does not match the actual backup file. Previously replayed + * messages read up to this new command will be written to the new file. This ensures a new + * complete backup of the command_topic is created. + */ +public class CommandTopicBackupImpl implements CommandTopicBackup { + private static final Logger LOG = LoggerFactory.getLogger(CommandTopicBackupImpl.class); + private static final Ticker CURRENT_MILLIS_TICKER = new Ticker() { + @Override + public long read() { + return System.currentTimeMillis(); + } + }; + private static final String PREFIX = "backup_"; + + private final File backupLocation; + private final String topicName; + private final Ticker ticker; + + private BackupReplayFile replayFile; + private List> latestReplay; + private int latestReplayIdx; + + public CommandTopicBackupImpl(final String location, final String topicName) { + this(location, topicName, CURRENT_MILLIS_TICKER); + } + + public CommandTopicBackupImpl( + final String location, + final String topicName, + final Ticker ticker + ) { + final File dir = new File(Objects.requireNonNull(location, "location")); + if (!dir.exists() || !dir.isDirectory()) { + throw new KsqlException(String.format( + "Backup location '%s' does not exist or it is not a directory.", location)); + } + + this.backupLocation = dir; + this.topicName = Objects.requireNonNull(topicName, "topicName"); + this.ticker = Objects.requireNonNull(ticker, "ticker"); + } + + @Override + public void initialize() { + replayFile = openOrCreateReplayFile(); + + try { + latestReplay = replayFile.readRecords(); + } catch (final IOException e) { + LOG.warn("Failed to read the latest backup from {}. Continue with a new file. Error = {}", + replayFile.getPath(), e.getMessage()); + + replayFile = newReplayFile(); + latestReplay = Collections.emptyList(); + } + + latestReplayIdx = 0; + LOG.info("Command topic will be backup on file: {}", replayFile.getPath()); + } + + @Override + public void close() { + try { + replayFile.close(); + } catch (final IOException e) { + LOG.warn("Failed closing the backup file {}. Error = {}", + replayFile.getPath(), e.getMessage()); + } + } + + @VisibleForTesting + BackupReplayFile getReplayFile() { + return replayFile; + } + + private boolean isRestoring() { + return latestReplayIdx < latestReplay.size(); + } + + private boolean isRecordInLatestReplay(final ConsumerRecord record) { + final Pair latestReplayRecord = latestReplay.get(latestReplayIdx); + if (record.key().equals(latestReplayRecord.left) + && record.value().equals(latestReplayRecord.right)) { + latestReplayIdx++; + return true; + } + + return false; + } + + @Override + public void writeRecord(final ConsumerRecord record) { + if (isRestoring()) { + if (isRecordInLatestReplay(record)) { + // Ignore backup because record was already replayed + return; + } else { + LOG.info("Previous command topic backup does not match the new command topic data. " + + "A new backup file will be created."); + createNewBackupFile(); + latestReplay.clear(); + LOG.info("New backup file created: {}", replayFile.getPath()); + } + } else if (latestReplay.size() > 0) { + // clear latest replay from memory + latestReplay.clear(); + } + + try { + replayFile.write(record.key(), record.value()); + } catch (final IOException e) { + LOG.warn("Failed to write to file {}. The command topic backup is not complete. " + + "Make sure the file exists and has permissions to write. KSQL must be restarted " + + "afterwards to complete the backup process. Error = {}", + replayFile.getPath(), e.getMessage()); + } + } + + private void createNewBackupFile() { + try { + replayFile.close(); + } catch (IOException e) { + LOG.warn("Couldn't close the current backup file {}. Error = {}", + replayFile.getPath(), e.getMessage()); + } + + replayFile = newReplayFile(); + + if (latestReplay.size() > 0 && latestReplayIdx > 0) { + try { + replayFile.write(latestReplay.subList(0, latestReplayIdx)); + } catch (final IOException e) { + LOG.warn("Couldn't write the latest replayed commands to the new backup file {}. " + + "Make sure the file exists and has permissions to write. " + + "KSQL must be restarted afterwards to complete the backup process. Error = {}", + replayFile.getPath(), e.getMessage()); + } + } + } + + @VisibleForTesting + BackupReplayFile openOrCreateReplayFile() { + final Optional latestFile = latestReplayFile(); + if (latestFile.isPresent()) { + return latestFile.get(); + } + + return newReplayFile(); + } + + private BackupReplayFile newReplayFile() { + return new BackupReplayFile(Paths.get( + backupLocation.getAbsolutePath(), + String.format("%s%s_%s", PREFIX, topicName, ticker.read()) + ).toFile()); + } + + private Optional latestReplayFile() { + final String prefixFilename = String.format("%s%s_", PREFIX, topicName); + final File[] files = backupLocation.listFiles( + (f, name) -> name.toLowerCase().startsWith(prefixFilename)); + + File latestBakFile = null; + if (files != null) { + long latestTs = 0; + for (int i = 0; i < files.length; i++) { + final File bakFile = files[i]; + final String bakTimestamp = bakFile.getName().substring(prefixFilename.length()); + + try { + final Long ts = Long.valueOf(bakTimestamp); + if (ts > latestTs) { + latestTs = ts; + latestBakFile = bakFile; + } + } catch (final NumberFormatException e) { + LOG.warn( + "Invalid timestamp '{}' found in backup replay file (file ignored): {}", + bakTimestamp, bakFile.getName()); + continue; + } + } + } + + return (latestBakFile != null) + ? Optional.of(new BackupReplayFile(latestBakFile)) + : Optional.empty(); + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupNoOp.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupNoOp.java new file mode 100644 index 000000000000..5fb860def9b8 --- /dev/null +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/CommandTopicBackupNoOp.java @@ -0,0 +1,37 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import io.confluent.ksql.rest.entity.CommandId; +import io.confluent.ksql.rest.server.computation.Command; +import org.apache.kafka.clients.consumer.ConsumerRecord; + +public class CommandTopicBackupNoOp implements CommandTopicBackup { + @Override + public void initialize() { + // no-op + } + + @Override + public void writeRecord(final ConsumerRecord record) { + // no-op + } + + @Override + public void close() { + // no-op + } +} diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java index f84ae8f816e1..413a39d3091c 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/KsqlRestApplication.java @@ -636,8 +636,8 @@ static KsqlRestApplication buildApplication( final String commandTopicName = ReservedInternalTopics.commandTopic(ksqlConfig); final CommandStore commandStore = CommandStore.Factory.create( + ksqlConfig, commandTopicName, - ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG), Duration.ofMillis(restConfig.getLong(DISTRIBUTED_COMMAND_RESPONSE_TIMEOUT_MS_CONFIG)), ksqlConfig.addConfluentMetricsContextConfigsKafka( restConfig.getCommandConsumerProperties()), diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java index 407887c7dc01..9d4a697f9fc1 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/CommandStore.java @@ -19,6 +19,10 @@ import com.google.common.collect.Maps; import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.server.CommandTopic; +import io.confluent.ksql.rest.server.CommandTopicBackup; +import io.confluent.ksql.rest.server.CommandTopicBackupImpl; +import io.confluent.ksql.rest.server.CommandTopicBackupNoOp; +import io.confluent.ksql.util.KsqlConfig; import io.confluent.ksql.util.KsqlException; import io.confluent.ksql.util.KsqlServerException; import java.io.Closeable; @@ -69,8 +73,8 @@ private Factory() { } public static CommandStore create( + final KsqlConfig ksqlConfig, final String commandTopicName, - final String transactionId, final Duration commandQueueCatchupTimeout, final Map kafkaConsumerProperties, final Map kafkaProducerProperties @@ -81,16 +85,34 @@ public static CommandStore create( ); kafkaProducerProperties.put( ProducerConfig.TRANSACTIONAL_ID_CONFIG, - transactionId + ksqlConfig.getString(KsqlConfig.KSQL_SERVICE_ID_CONFIG) ); kafkaProducerProperties.put( ProducerConfig.ACKS_CONFIG, "all" ); + CommandTopicBackup commandTopicBackup = new CommandTopicBackupNoOp(); + if (ksqlConfig.getBoolean(KsqlConfig.KSQL_ENABLE_METASTORE_BACKUP)) { + if (ksqlConfig.getString(KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION).isEmpty()) { + throw new KsqlException(String.format("Metastore backups is enabled, but location " + + "is empty. Please specify the location with the property '%s'", + KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION)); + } + + commandTopicBackup = new CommandTopicBackupImpl( + ksqlConfig.getString(KsqlConfig.KSQL_METASTORE_BACKUP_LOCATION), + commandTopicName) + ; + } + return new CommandStore( commandTopicName, - new CommandTopic(commandTopicName, kafkaConsumerProperties), + new CommandTopic( + commandTopicName, + kafkaConsumerProperties, + commandTopicBackup + ), new SequenceNumberFutureStore(), kafkaConsumerProperties, kafkaProducerProperties, diff --git a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/QueuedCommand.java b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/QueuedCommand.java index 3b46780603bc..695c4773313e 100644 --- a/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/QueuedCommand.java +++ b/ksqldb-rest-app/src/main/java/io/confluent/ksql/rest/server/computation/QueuedCommand.java @@ -31,10 +31,10 @@ public QueuedCommand( final Optional status, final Long offset ) { - this.commandId = Objects.requireNonNull(commandId); - this.command = Objects.requireNonNull(command); - this.status = Objects.requireNonNull(status); - this.offset = Objects.requireNonNull(offset); + this.commandId = Objects.requireNonNull(commandId, "commandId"); + this.command = Objects.requireNonNull(command,"command"); + this.status = Objects.requireNonNull(status, "status"); + this.offset = Objects.requireNonNull(offset, "offset"); } public CommandId getCommandId() { diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupReplayFileTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupReplayFileTest.java new file mode 100644 index 000000000000..ba7ad945d7c5 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/BackupReplayFileTest.java @@ -0,0 +1,173 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import io.confluent.ksql.rest.entity.CommandId; +import io.confluent.ksql.rest.server.computation.Command; +import io.confluent.ksql.util.Pair; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.File; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.nio.file.Files; +import java.util.Arrays; +import java.util.List; +import java.util.Optional; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.is; + +@RunWith(MockitoJUnitRunner.class) +public class BackupReplayFileTest { + private static final String KEY_VALUE_SEPARATOR = ":"; + private static final String REPLAY_FILE_NAME = "backup_command_topic_1"; + + @Rule + public TemporaryFolder backupLocation = new TemporaryFolder(); + + private BackupReplayFile replayFile; + private File internalReplayFile; + + @Before + public void setup() throws IOException { + internalReplayFile = backupLocation.newFile(REPLAY_FILE_NAME); + replayFile = new BackupReplayFile(internalReplayFile); + } + + @Test + public void shouldGetFilePath() { + // When + final String path = replayFile.getPath(); + + // Then + assertThat(path, is(String.format( + "%s/%s", backupLocation.getRoot().getAbsolutePath(), REPLAY_FILE_NAME))); + } + + @Test + public void shouldWriteRecord() throws IOException { + // Given + final Pair record = newStreamRecord("stream1"); + + // When + replayFile.write(record.left, record.right); + + // Then + final List commands = Files.readAllLines(internalReplayFile.toPath()); + assertThat(commands.size(), is(1)); + assertThat(commands.get(0), is( + "\"stream/stream1/create\"" + KEY_VALUE_SEPARATOR + + "{\"statement\":\"CREATE STREAM stream1 (id INT) WITH (kafka_topic='stream1')\"}" + )); + } + + @Test + public void shouldWriteListOfRecords() throws IOException { + // Given + final Pair record1 = newStreamRecord("stream1"); + final Pair record2 = newStreamRecord("stream2"); + + // When + replayFile.write(Arrays.asList(record1, record2)); + + // Then + final List commands = Files.readAllLines(internalReplayFile.toPath()); + assertThat(commands.size(), is(2)); + assertThat(commands.get(0), is( + "\"stream/stream1/create\"" + KEY_VALUE_SEPARATOR + + "{\"statement\":\"CREATE STREAM stream1 (id INT) WITH (kafka_topic='stream1')\"}" + )); + assertThat(commands.get(1), is( + "\"stream/stream2/create\"" + KEY_VALUE_SEPARATOR + + "{\"statement\":\"CREATE STREAM stream2 (id INT) WITH (kafka_topic='stream2')\"}" + )); + } + + @Test + public void shouldWriteRecordWithNewLineCharacterInCommand() throws IOException { + // Given + final CommandId commandId1 = new CommandId( + CommandId.Type.STREAM, "stream1", CommandId.Action.CREATE); + final Command command1 = new Command( + "CREATE STREAM stream1 (id INT, f\n1 INT) WITH (kafka_topic='topic1)", + Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty() + ); + + // When + replayFile.write(commandId1, command1); + + // Then + final List commands = Files.readAllLines(internalReplayFile.toPath()); + assertThat(commands.size(), is(1)); + assertThat(commands.get(0), is( + "\"stream/stream1/create\"" + KEY_VALUE_SEPARATOR + + "{\"statement\":" + + "\"CREATE STREAM stream1 (id INT, f\\n1 INT) WITH (kafka_topic='topic1)\"}" + )); + } + + @Test + public void shouldBeEmptyWhenReadAllCommandsFromEmptyFile() throws IOException { + // When + final List commands = replayFile.readRecords(); + + // Then + assertThat(commands.size(), is(0)); + } + + @Test + public void shouldReadCommands() throws IOException { + // Given + final Pair record1 = newStreamRecord("stream1"); + final Pair record2 = newStreamRecord("stream2"); + Files.write(internalReplayFile.toPath(), + String.format("%s%s%s%n%s%s%s", + "\"stream/stream1/create\"", + KEY_VALUE_SEPARATOR, + "{\"statement\":\"CREATE STREAM stream1 (id INT) WITH (kafka_topic='stream1')\"}", + "\"stream/stream2/create\"", + KEY_VALUE_SEPARATOR, + "{\"statement\":\"CREATE STREAM stream2 (id INT) WITH (kafka_topic='stream2')\"}" + ).getBytes(StandardCharsets.UTF_8)); + + // When + final List> commands = replayFile.readRecords(); + + // Then + assertThat(commands.size(), is(2)); + assertThat(commands.get(0).left, is(record1.left)); + assertThat(commands.get(0).right, is(record1.right)); + assertThat(commands.get(1).left, is(record2.left)); + assertThat(commands.get(1).right, is(record2.right)); + } + + private Pair newStreamRecord(final String streamName) { + final CommandId commandId = new CommandId( + CommandId.Type.STREAM, streamName, CommandId.Action.CREATE); + final Command command = new Command( + String.format("CREATE STREAM %s (id INT) WITH (kafka_topic='%s')", streamName, streamName), + Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty() + ); + + return new Pair<>(commandId, command); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java new file mode 100644 index 000000000000..c163c4a733e1 --- /dev/null +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicBackupImplTest.java @@ -0,0 +1,295 @@ +/* + * Copyright 2020 Confluent Inc. + * + * Licensed under the Confluent Community License (the "License"); you may not use + * this file except in compliance with the License. You may obtain a copy of the + * License at + * + * http://www.confluent.io/confluent-community-license + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ + +package io.confluent.ksql.rest.server; + +import com.google.common.base.Ticker; +import io.confluent.ksql.rest.entity.CommandId; +import io.confluent.ksql.rest.server.computation.Command; +import io.confluent.ksql.util.KsqlException; +import io.confluent.ksql.util.Pair; +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.TemporaryFolder; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +import java.io.File; +import java.io.IOException; +import java.util.List; +import java.util.Optional; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.containsString; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.not; +import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.when; + +@RunWith(MockitoJUnitRunner.class) +public class CommandTopicBackupImplTest { + private static final String COMMAND_TOPIC_NAME = "command_topic"; + + private Pair command1 = newStreamRecord("stream1"); + private Pair command2 = newStreamRecord("stream2"); + private Pair command3 = newStreamRecord("stream3"); + + @Mock + private Ticker ticker; + + @Rule + public TemporaryFolder backupLocation = new TemporaryFolder(); + + private CommandTopicBackupImpl commandTopicBackup; + + @Before + public void setup() { + commandTopicBackup = new CommandTopicBackupImpl( + backupLocation.getRoot().getAbsolutePath(), COMMAND_TOPIC_NAME, ticker); + } + + private Pair newStreamRecord(final String streamName) { + final CommandId commandId = new CommandId( + CommandId.Type.STREAM, streamName, CommandId.Action.CREATE); + final Command command = new Command( + String.format("CREATE STREAM %s (id INT) WITH (kafka_topic='%s", streamName, streamName), + Optional.empty(), Optional.empty(), Optional.empty(), Optional.empty() + ); + + return new Pair<>(commandId, command); + } + + @Test + public void shouldThrowWhenBackupLocationIsNotDirectory() throws IOException { + // Given + final File file = backupLocation.newFile(); + + // When + final Exception e = assertThrows( + KsqlException.class, + () -> new CommandTopicBackupImpl(file.getAbsolutePath(), COMMAND_TOPIC_NAME) + ); + + // Then + assertThat(e.getMessage(), containsString(String.format( + "Backup location '%s' does not exist or it is not a directory.", + file.getAbsolutePath() + ))); + } + + @Test + public void shouldThrowWhenBackupLocationDoesNotExist() { + // When + final Exception e = assertThrows( + KsqlException.class, + () -> new CommandTopicBackupImpl("/not-existing-directory", COMMAND_TOPIC_NAME) + ); + + // Then + assertThat(e.getMessage(), containsString(String.format( + "Backup location '/not-existing-directory' does not exist or it is not a directory." + ))); + } + + @Test + public void shouldWriteRecordsToReplayFile() throws IOException { + // Given + commandTopicBackup.initialize(); + + // When + final ConsumerRecord record = newConsumerRecord(command1); + commandTopicBackup.writeRecord(record); + + // Then + final List> commands = + commandTopicBackup.getReplayFile().readRecords(); + assertThat(commands.size(), is(1)); + assertThat(commands.get(0).left, is(command1.left)); + assertThat(commands.get(0).right, is(command1.right)); + } + + @Test + public void shouldIgnoreRecordPreviouslyReplayed() throws IOException { + // Given + final ConsumerRecord record = newConsumerRecord(command1); + commandTopicBackup.initialize(); + commandTopicBackup.writeRecord(record); + final BackupReplayFile previousReplayFile = commandTopicBackup.getReplayFile(); + + // When + // A 2nd initialize call will open the latest backup and read the previous replayed commands + commandTopicBackup.initialize(); + commandTopicBackup.writeRecord(record); + final BackupReplayFile currentReplayFile = commandTopicBackup.getReplayFile(); + + // Then + final List> commands = currentReplayFile.readRecords(); + assertThat(commands.size(), is(1)); + assertThat(commands.get(0).left, is(command1.left)); + assertThat(commands.get(0).right, is(command1.right)); + assertThat(currentReplayFile.getPath(), is(previousReplayFile.getPath())); + } + + @Test + public void shouldCreateNewReplayFileIfNewRecordsDoNotMatchPreviousBackups() throws IOException { + // Given + final ConsumerRecord record1 = newConsumerRecord(command1); + commandTopicBackup.initialize(); + commandTopicBackup.writeRecord(record1); + final BackupReplayFile previousReplayFile = commandTopicBackup.getReplayFile(); + + // When + // A 2nd initialize call will open the latest backup and read the previous replayed commands + commandTopicBackup.initialize(); + final ConsumerRecord record2 = newConsumerRecord(command2); + // Need to increase the ticker so the new file has a new timestamp + when(ticker.read()).thenReturn(2L); + // The write command will create a new replay file with the new command + commandTopicBackup.writeRecord(record2); + final BackupReplayFile currentReplayFile = commandTopicBackup.getReplayFile(); + + // Then + List> commands = previousReplayFile.readRecords(); + assertThat(commands.size(), is(1)); + assertThat(commands.get(0).left, is(command1.left)); + assertThat(commands.get(0).right, is(command1.right)); + commands = currentReplayFile.readRecords(); + assertThat(commands.size(), is(1)); + assertThat(commands.get(0).left, is(command2.left)); + assertThat(commands.get(0).right, is(command2.right)); + assertThat(currentReplayFile.getPath(), not(previousReplayFile.getPath())); + } + + @Test + public void shouldWritePreviousReplayedRecordsAlreadyChecked() throws IOException { + // Given + final ConsumerRecord record1 = newConsumerRecord(command1); + final ConsumerRecord record2 = newConsumerRecord(command2); + commandTopicBackup.initialize(); + commandTopicBackup.writeRecord(record1); + commandTopicBackup.writeRecord(record2); + final BackupReplayFile previousReplayFile = commandTopicBackup.getReplayFile(); + + // When + // A 2nd initialize call will open the latest backup and read the previous replayed commands + commandTopicBackup.initialize(); + // Need to increase the ticker so the new file has a new timestamp + when(ticker.read()).thenReturn(2L); + // command1 is ignored because it was previously replayed + commandTopicBackup.writeRecord(record1); + // The write command will create a new replay file with the new command, and command1 will + // be written to have a complete backup + final ConsumerRecord record3 = newConsumerRecord(command3); + commandTopicBackup.writeRecord(record3); + final BackupReplayFile currentReplayFile = commandTopicBackup.getReplayFile(); + + // Then + List> commands = previousReplayFile.readRecords(); + assertThat(commands.size(), is(2)); + assertThat(commands.get(0).left, is(command1.left)); + assertThat(commands.get(0).right, is(command1.right)); + assertThat(commands.get(1).left, is(command2.left)); + assertThat(commands.get(1).right, is(command2.right)); + commands = currentReplayFile.readRecords(); + assertThat(commands.size(), is(2)); + assertThat(commands.get(0).left, is(command1.left)); + assertThat(commands.get(0).right, is(command1.right)); + assertThat(commands.get(1).left, is(command3.left)); + assertThat(commands.get(1).right, is(command3.right)); + assertThat(currentReplayFile.getPath(), not(previousReplayFile.getPath())); + } + + @Test + public void shouldCreateNewReplayFileWhenNoBackupFilesExist() { + // Given: + when(ticker.read()).thenReturn(123L); + + // When: + final BackupReplayFile replayFile = commandTopicBackup.openOrCreateReplayFile(); + + // Then: + assertThat(replayFile.getPath(), is(String.format( + "%s/backup_command_topic_123", backupLocation.getRoot().getAbsolutePath() + ))); + } + + @Test + public void shouldOpenLatestReplayFileWhenOneExists() throws IOException { + // Given: + backupLocation.newFile("backup_command_topic_111"); + + // When: + final BackupReplayFile replayFile = commandTopicBackup.openOrCreateReplayFile(); + + // Then: + assertThat(replayFile.getPath(), is(String.format( + "%s/backup_command_topic_111", backupLocation.getRoot().getAbsolutePath() + ))); + } + + @Test + public void shouldOpenLatestReplayFileWhenTwoExist() throws IOException { + // Given: + backupLocation.newFile("backup_command_topic_111"); + backupLocation.newFile("backup_command_topic_222"); + + // When: + final BackupReplayFile replayFile = commandTopicBackup.openOrCreateReplayFile(); + + // Then: + assertThat(replayFile.getPath(), is(String.format( + "%s/backup_command_topic_222", backupLocation.getRoot().getAbsolutePath() + ))); + } + + @Test + public void shouldOpenLatestReplayFileWhenDifferentCommandTopicNamesExist() throws IOException { + // Given: + backupLocation.newFile("backup_command_topic_111"); + backupLocation.newFile("backup_other_command_topic_222"); + + // When: + final BackupReplayFile replayFile = commandTopicBackup.openOrCreateReplayFile(); + + // Then: + assertThat(replayFile.getPath(), is(String.format( + "%s/backup_command_topic_111", backupLocation.getRoot().getAbsolutePath() + ))); + } + + @Test + public void shouldOpenReplayFileAndIgnoreFileWithInvalidTimestamp() throws IOException { + // Given: + backupLocation.newFile("backup_command_topic_111"); + backupLocation.newFile("backup_command_topic_222x"); + + // When: + final BackupReplayFile replayFile = commandTopicBackup.openOrCreateReplayFile(); + + // Then: + assertThat(replayFile.getPath(), is(String.format( + "%s/backup_command_topic_111", backupLocation.getRoot().getAbsolutePath() + ))); + } + + private ConsumerRecord newConsumerRecord( + final Pair record + ) { + return new ConsumerRecord<>("topic", 0, 0, record.left, record.right); + } +} diff --git a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java index 312ea1993cff..110ab0ead059 100644 --- a/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java +++ b/ksqldb-rest-app/src/test/java/io/confluent/ksql/rest/server/CommandTopicTest.java @@ -20,6 +20,7 @@ import static org.hamcrest.core.IsEqual.equalTo; import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -28,24 +29,25 @@ import io.confluent.ksql.rest.entity.CommandId; import io.confluent.ksql.rest.server.computation.Command; import io.confluent.ksql.rest.server.computation.QueuedCommand; + import java.time.Duration; import java.util.Arrays; import java.util.Collection; import java.util.Collections; import java.util.List; import java.util.Optional; -import java.util.concurrent.Future; import org.apache.kafka.clients.consumer.Consumer; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.consumer.ConsumerRecords; -import org.apache.kafka.clients.producer.RecordMetadata; import org.apache.kafka.common.TopicPartition; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import org.mockito.InOrder; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.MockitoJUnitRunner; @RunWith(MockitoJUnitRunner.class) @@ -58,8 +60,7 @@ public class CommandTopicTest { private CommandTopic commandTopic; @Mock - private Future future; - + private CommandTopicBackup commandTopicBackup; @Mock private CommandId commandId1; @Mock @@ -83,7 +84,7 @@ public class CommandTopicTest { @Before @SuppressWarnings("unchecked") public void setup() { - commandTopic = new CommandTopic(COMMAND_TOPIC_NAME, commandConsumer); + commandTopic = new CommandTopic(COMMAND_TOPIC_NAME, commandConsumer, commandTopicBackup); } @Test @@ -246,6 +247,66 @@ public void shouldHaveAllCreateCommandsInOrder() { ))); } + @Test + public void shouldInitializeCommandTopicBackup() { + // When + commandTopic.start(); + + // Then + verify(commandTopicBackup, times(1)).initialize(); + } + + @Test + public void shouldCloseCommandTopicBackup() { + // When + commandTopic.close(); + + // Then + verify(commandTopicBackup, times(1)).close(); + } + + @Test + public void shouldBackupRestoreCommands() { + // Given + final ConsumerRecord record1 = + new ConsumerRecord<>("topic", 0, 0, commandId1, command1); + final ConsumerRecord record2 = + new ConsumerRecord<>("topic", 0, 0, commandId2, command2); + when(commandConsumer.poll(any(Duration.class))) + .thenReturn(someConsumerRecords(record1, record2)) + .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + commandTopic.start(); + + // When + commandTopic.getRestoreCommands(Duration.ofHours(1)); + + // Then + final InOrder inOrder = Mockito.inOrder(commandTopicBackup); + inOrder.verify(commandTopicBackup, times(1)).writeRecord(record1); + inOrder.verify(commandTopicBackup, times(1)).writeRecord(record2); + } + + @Test + public void shouldBackupNewCommands() { + // Given + final ConsumerRecord record1 = + new ConsumerRecord<>("topic", 0, 0, commandId1, command1); + final ConsumerRecord record2 = + new ConsumerRecord<>("topic", 0, 1, commandId2, command2); + when(commandConsumer.poll(any(Duration.class))) + .thenReturn(someConsumerRecords(record1, record2)) + .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); + commandTopic.start(); + + // When + commandTopic.getNewCommands(Duration.ofHours(1)); + + // Then + final InOrder inOrder = Mockito.inOrder(commandTopicBackup); + inOrder.verify(commandTopicBackup, times(1)).writeRecord(record1); + inOrder.verify(commandTopicBackup, times(1)).writeRecord(record2); + } + @Test public void shouldGetEndOffsetCorrectly() { // Given: