Skip to content

Commit

Permalink
Lint
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanConfluent committed Dec 8, 2020
1 parent 00285af commit 0c572b5
Show file tree
Hide file tree
Showing 5 changed files with 57 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -318,8 +318,8 @@ public void cleanupOrphanedInternalTopics(
final ServiceContext serviceContext,
final Set<String> queryApplicationIds
) {
orphanedTransientQueryCleaner.cleanupOrphanedInternalTopics(serviceContext,
queryApplicationIds);
orphanedTransientQueryCleaner
.cleanupOrphanedInternalTopics(serviceContext, queryApplicationIds);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1069,7 +1069,7 @@ private static Optional<LocalCommands> createLocalCommands(
final KsqlEngine ksqlEngine
) {
if (!restConfig.getString(KsqlRestConfig.KSQL_LOCAL_COMMANDS_LOCATION_CONFIG).isEmpty()) {
File file
final File file
= new File(restConfig.getString(KsqlRestConfig.KSQL_LOCAL_COMMANDS_LOCATION_CONFIG));
return Optional.of(LocalCommands.open(ksqlEngine, file));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import io.confluent.ksql.engine.KsqlEngine;
Expand Down Expand Up @@ -56,19 +71,16 @@ public void processLocalCommandFiles(
final ServiceContext serviceContext
) {
final FilenameFilter filter = (dir, fileName) -> fileName.endsWith(LOCAL_COMMANDS_FILE_SUFFIX);
File[] files = directory.listFiles(filter);
final File[] files = directory.listFiles(filter);
if (files == null) {
throw new KsqlServerException("Bad directory " + directory.getAbsolutePath());
}
if (files.length == 1) {
System.out.println("Too small");
}
for (File file : files) {
for (final File file : files) {
if (file.equals(currentLocalCommands.getFile())) {
continue;
}
try (LocalCommandsFile localCommandsFile = LocalCommandsFile.createReadonly(file)) {
List<LocalCommand> localCommands = localCommandsFile.readRecords();
final List<LocalCommand> localCommands = localCommandsFile.readRecords();
cleanUpTransientQueryState(localCommands, serviceContext);

markFileAsProcessed(file);
Expand Down Expand Up @@ -143,8 +155,8 @@ public static LocalCommands open(
return new LocalCommands(directory, ksqlEngine, LocalCommandsFile.createWriteable(file));
}

private void markFileAsProcessed(File file) {
File updatedName = new File(file.getParentFile(),
private void markFileAsProcessed(final File file) {
final File updatedName = new File(file.getParentFile(),
file.getName() + LOCAL_COMMANDS_PROCESSED_SUFFIX);
if (!file.renameTo(updatedName)) {
throw new KsqlException("Couldn't rename file " + file.getAbsolutePath());
Expand All @@ -154,14 +166,12 @@ private void markFileAsProcessed(File file) {
private void cleanUpTransientQueryState(
final List<LocalCommand> localCommands,
final ServiceContext serviceContext) {
Set<String> queryApplicationIds = localCommands.stream()
final Set<String> queryApplicationIds = localCommands.stream()
.filter(c -> c.getType() == Type.TRANSIENT_QUERY)
.map(LocalCommand::getQueryApplicationId)
.collect(Collectors.toSet());
if (queryApplicationIds.size() > 0) {
ksqlEngine.cleanupOrphanedInternalTopics(serviceContext, queryApplicationIds);
} else {
System.out.println("queryApplicationIds is " + queryApplicationIds);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -17,7 +32,7 @@
/**
* Represents a single file of commands issued to this node.
*/
public class LocalCommandsFile implements Closeable {
public final class LocalCommandsFile implements Closeable {

private static final ObjectMapper MAPPER = new ObjectMapper();
private static final byte[] NEW_LINE_SEPARATOR_BYTES =
Expand Down Expand Up @@ -50,7 +65,7 @@ public void write(final LocalCommand localCommand) throws IOException {
throw new IOException("Write permission denied.");
}

byte[] bytes = MAPPER.writeValueAsBytes(localCommand);
final byte[] bytes = MAPPER.writeValueAsBytes(localCommand);
writer.write(bytes);
writer.write(NEW_LINE_SEPARATOR_BYTES);
writer.flush();
Expand All @@ -59,7 +74,7 @@ public void write(final LocalCommand localCommand) throws IOException {
public List<LocalCommand> readRecords() throws IOException {
final List<LocalCommand> localCommands = new ArrayList<>();
for (final String line : Files.readAllLines(file.toPath(), StandardCharsets.UTF_8)) {
LocalCommand localCommand = MAPPER.readValue(line, LocalCommand.class);
final LocalCommand localCommand = MAPPER.readValue(line, LocalCommand.class);
localCommands.add(localCommand);
}
return localCommands;
Expand Down

0 comments on commit 0c572b5

Please sign in to comment.