Skip to content

Commit

Permalink
Lint
Browse files Browse the repository at this point in the history
  • Loading branch information
AlanConfluent committed Dec 8, 2020
1 parent 79fdf06 commit 5d6f964
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -315,7 +315,8 @@ public void cleanupOrphanedInternalTopics(
final ServiceContext serviceContext,
final Set<String> queryApplicationIds
) {
orphanedTransientQueryCleaner.cleanupOrphanedInternalTopics(serviceContext, queryApplicationIds);
orphanedTransientQueryCleaner
.cleanupOrphanedInternalTopics(serviceContext, queryApplicationIds);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,11 @@

import static java.util.Objects.requireNonNull;

import com.google.common.annotations.VisibleForTesting;
import io.confluent.ksql.exception.KafkaResponseGetFailedException;
import io.confluent.ksql.services.KafkaTopicClient;
import io.confluent.ksql.services.ServiceContext;
import java.util.Optional;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1058,7 +1058,7 @@ private static Optional<LocalCommands> createLocalCommands(
final KsqlEngine ksqlEngine
) {
if (!restConfig.getString(KsqlRestConfig.KSQL_LOCAL_COMMANDS_LOCATION_CONFIG).isEmpty()) {
File file
final File file
= new File(restConfig.getString(KsqlRestConfig.KSQL_LOCAL_COMMANDS_LOCATION_CONFIG));
return Optional.of(LocalCommands.open(ksqlEngine, file));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,8 @@ public CompletableFuture<QueryPublisher> createQueryPublisher(final String sql,
return executeOnWorker(() -> {
try {
return new QueryEndpoint(
ksqlEngine, ksqlConfig, routingFilterFactory, pullQueryMetrics, rateLimiter, localCommands)
ksqlEngine, ksqlConfig, routingFilterFactory, pullQueryMetrics, rateLimiter,
localCommands)
.createQueryPublisher(
sql,
properties,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import com.fasterxml.jackson.annotation.JsonCreator;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import io.confluent.ksql.engine.KsqlEngine;
Expand Down Expand Up @@ -54,16 +69,16 @@ public void processLocalCommandFiles(
final ServiceContext serviceContext
) {
final FilenameFilter filter = (dir, fileName) -> fileName.endsWith(LOCAL_COMMANDS_FILE_SUFFIX);
File[] files = directory.listFiles(filter);
final File[] files = directory.listFiles(filter);
if (files == null) {
throw new KsqlServerException("Bad directory " + directory.getAbsolutePath());
}
for (File file : files) {
for (final File file : files) {
if (file.equals(currentLocalCommands.getFile())) {
continue;
}
try (LocalCommandsFile localCommandsFile = LocalCommandsFile.createReadonly(file)) {
List<LocalCommand> localCommands = localCommandsFile.readRecords();
final List<LocalCommand> localCommands = localCommandsFile.readRecords();
cleanUpTransientQueryState(localCommands, serviceContext);

markFileAsProcessed(file);
Expand Down Expand Up @@ -132,13 +147,13 @@ public static LocalCommands open(
+ "' config in the properties file."
);
}
File file = new File(directory, String.format("local_commands_%d%s",
final File file = new File(directory, String.format("local_commands_%d%s",
System.currentTimeMillis(), LOCAL_COMMANDS_FILE_SUFFIX));
return new LocalCommands(directory, ksqlEngine, LocalCommandsFile.createWriteable(file));
}

private void markFileAsProcessed(File file) {
File updatedName = new File(file.getParentFile(),
private void markFileAsProcessed(final File file) {
final File updatedName = new File(file.getParentFile(),
file.getName() + LOCAL_COMMANDS_PROCESSED_SUFFIX);
if (!file.renameTo(updatedName)) {
throw new KsqlException("Couldn't rename file " + file.getAbsolutePath());
Expand All @@ -148,7 +163,7 @@ private void markFileAsProcessed(File file) {
private void cleanUpTransientQueryState(
final List<LocalCommand> localCommands,
final ServiceContext serviceContext) {
Set<String> queryApplicationIds = localCommands.stream()
final Set<String> queryApplicationIds = localCommands.stream()
.filter(c -> c.getType() == Type.TRANSIENT_QUERY)
.map(LocalCommand::getQueryApplicationId)
.collect(Collectors.toSet());
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
/*
* Copyright 2020 Confluent Inc.
*
* Licensed under the Confluent Community License (the "License"); you may not use
* this file except in compliance with the License. You may obtain a copy of the
* License at
*
* http://www.confluent.io/confluent-community-license
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package io.confluent.ksql.rest.server;

import com.fasterxml.jackson.databind.ObjectMapper;
Expand All @@ -16,7 +31,7 @@
/**
* Represents a single file of commands issued to this node.
*/
public class LocalCommandsFile implements Closeable {
public final class LocalCommandsFile implements Closeable {

private static final ObjectMapper MAPPER = new ObjectMapper();
private static final byte[] NEW_LINE_SEPARATOR_BYTES =
Expand Down Expand Up @@ -49,7 +64,7 @@ public void write(final LocalCommand localCommand) throws IOException {
throw new IOException("Write permission denied.");
}

byte[] bytes = MAPPER.writeValueAsBytes(localCommand);
final byte[] bytes = MAPPER.writeValueAsBytes(localCommand);
writer.write(bytes);
writer.write(NEW_LINE_SEPARATOR_BYTES);
writer.flush();
Expand All @@ -58,7 +73,7 @@ public void write(final LocalCommand localCommand) throws IOException {
public List<LocalCommand> readRecords() throws IOException {
final List<LocalCommand> localCommands = new ArrayList<>();
for (final String line : Files.readAllLines(file.toPath(), StandardCharsets.UTF_8)) {
LocalCommand localCommand = MAPPER.readValue(line, LocalCommand.class);
final LocalCommand localCommand = MAPPER.readValue(line, LocalCommand.class);
localCommands.add(localCommand);
}
return localCommands;
Expand Down

0 comments on commit 5d6f964

Please sign in to comment.