From fc430753a1d665afaf208bb8b15dd809a630a829 Mon Sep 17 00:00:00 2001 From: Lokesh Lingarajan Date: Thu, 5 Aug 2021 10:15:52 -0700 Subject: [PATCH] Fixing checkstyling --- .../input/kafkainput/KafkaHeaderReader.java | 1 - .../input/kafkainput/KafkaInputFormat.java | 4 - .../input/kafkainput/KafkaInputReader.java | 21 ++-- .../kafkainput/KafkaInputFormatTest.java | 10 +- .../KafkaStringHeaderFormatTest.java | 103 +++++++++++------- 5 files changed, 80 insertions(+), 59 deletions(-) diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaHeaderReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaHeaderReader.java index 79098f1de8b1..e1da7f0116ff 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaHeaderReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaHeaderReader.java @@ -19,7 +19,6 @@ package org.apache.druid.data.input.kafkainput; -import org.apache.druid.data.input.InputRow; import java.io.IOException; import java.util.Map; diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java index 3dbf735d5d71..721ae02bafbb 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputFormat.java @@ -21,21 +21,17 @@ import com.fasterxml.jackson.annotation.JsonProperty; import com.google.common.base.Preconditions; -import com.google.common.collect.ImmutableList; import org.apache.druid.data.input.InputEntity; import org.apache.druid.data.input.InputEntityReader; import org.apache.druid.data.input.InputFormat; import org.apache.druid.data.input.InputRowSchema; import org.apache.druid.data.input.impl.ByteEntity; -import org.apache.druid.data.input.impl.DimensionsSpec; import org.apache.druid.data.input.impl.TimestampSpec; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.java.util.common.DateTimes; -import org.joda.time.DateTime; import javax.annotation.Nullable; import java.io.File; -import java.util.Collections; import java.util.Objects; public class KafkaInputFormat implements InputFormat diff --git a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java index e031ba7fd050..e0360bdf6b2c 100644 --- a/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java +++ b/extensions-core/kafka-indexing-service/src/main/java/org/apache/druid/data/input/kafkainput/KafkaInputReader.java @@ -19,7 +19,6 @@ package org.apache.druid.data.input.kafkainput; -import org.apache.druid.java.util.common.logger.Logger; import com.google.common.collect.Lists; import com.google.common.collect.Sets; import org.apache.druid.data.input.InputEntityReader; @@ -29,6 +28,7 @@ import org.apache.druid.data.input.MapBasedInputRow; import org.apache.druid.data.input.kafka.KafkaRecordEntity; import org.apache.druid.java.util.common.CloseableIterators; +import org.apache.druid.java.util.common.logger.Logger; import org.apache.druid.java.util.common.parsers.CloseableIterator; import org.apache.druid.java.util.common.parsers.ParseException; @@ -91,11 +91,12 @@ public CloseableIterator read() throws IOException keyRow.getEvent().entrySet().stream().findFirst().get().getValue() ); } - } catch (Exception e) { - if (e instanceof IOException){ + } + catch (Exception e) { + if (e instanceof IOException) { log.error(e, "Encountered IOException during key parsing."); - throw (IOException)e; - } else if (e instanceof ParseException) { + throw (IOException) e; + } else if (e instanceof ParseException) { log.error(e, "Encountered key parsing exception."); } else { log.error(e, "Encountered exception during key parsing."); @@ -139,11 +140,12 @@ public CloseableIterator read() throws IOException event )); } - } catch (Exception e) { - if (e instanceof IOException){ + } + catch (Exception e) { + if (e instanceof IOException) { log.error(e, "Encountered IOException during value parsing."); - throw (IOException)e; - } else if (e instanceof ParseException) { + throw (IOException) e; + } else if (e instanceof ParseException) { log.error(e, "Encountered value parsing exception."); } else { log.error(e, "Encountered exception during value parsing."); @@ -178,4 +180,3 @@ public CloseableIterator sample() throws IOException return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent())); } } - diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java index b177b00d834c..98fe2f03ecb8 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaInputFormatTest.java @@ -236,15 +236,15 @@ public void testWithOutKey() throws IOException @Test public void testTimestampFromHeader() throws IOException { - Iterable
SAMPLE_HEADERS_WITH_TS = Iterables.unmodifiableIterable( + Iterable
sample_header_with_ts = Iterables.unmodifiableIterable( Iterables.concat( SAMPLE_HEADERS, ImmutableList.of(new Header() { @Override public String key() - { - return "headerTs"; - } + { + return "headerTs"; + } @Override public byte[] value() { @@ -268,7 +268,7 @@ public byte[] value() + " }\n" + "}"); - Headers headers = new RecordHeaders(SAMPLE_HEADERS_WITH_TS); + Headers headers = new RecordHeaders(sample_header_with_ts); inputEntity = new KafkaRecordEntity(new ConsumerRecord( "sample", 0, 0, timestamp, null, null, 0, 0, diff --git a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaStringHeaderFormatTest.java b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaStringHeaderFormatTest.java index 1c0ae0f20cbe..36602eb871c0 100644 --- a/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaStringHeaderFormatTest.java +++ b/extensions-core/kafka-indexing-service/src/test/java/org/apache/druid/data/input/kafkainput/KafkaStringHeaderFormatTest.java @@ -31,38 +31,44 @@ import java.io.IOException; import java.nio.charset.StandardCharsets; -import java.util.Map; import java.util.HashMap; +import java.util.Map; public class KafkaStringHeaderFormatTest { - private KafkaRecordEntity inputEntity; - private long timestamp = DateTimes.of("2021-06-24T00:00:00.000Z").getMillis(); - private static final Iterable
SAMPLE_HEADERS = ImmutableList.of(new Header() { - @Override - public String key() + private static final Iterable
SAMPLE_HEADERS = ImmutableList.of( + new Header() { - return "encoding"; - } - @Override - public byte[] value() + @Override + public String key() + { + return "encoding"; + } + + @Override + public byte[] value() + { + return "application/json".getBytes(StandardCharsets.UTF_8); + } + }, + new Header() { - return "application/json".getBytes(StandardCharsets.UTF_8); - } - }, - new Header() { @Override public String key() { return "kafkapkc"; } + @Override public byte[] value() { return "pkc-bar".getBytes(StandardCharsets.UTF_8); } - }); + } + ); + private KafkaRecordEntity inputEntity; + private long timestamp = DateTimes.of("2021-06-24T00:00:00.000Z").getMillis(); @Test public void testDefaultHeaderFormat() throws IOException @@ -73,14 +79,17 @@ public void testDefaultHeaderFormat() throws IOException inputEntity = new KafkaRecordEntity(new ConsumerRecord( "sample", 0, 0, timestamp, null, null, 0, 0, - null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers)); - Map expectedResults = new HashMap() {{ - put("test.kafka.header.encoding","application/json"); - put("test.kafka.header.kafkapkc","pkc-bar"); - }}; + null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers + )); + Map expectedResults = new HashMap() { + { + put("test.kafka.header.encoding", "application/json"); + put("test.kafka.header.kafkapkc", "pkc-bar"); + } + }; KafkaHeaderFormat headerInput = new KafkaStringHeaderFormat(null); - KafkaHeaderReader headerParser = headerInput.createReader(inputEntity.getRecord().headers(),headerLabelPrefix); + KafkaHeaderReader headerParser = headerInput.createReader(inputEntity.getRecord().headers(), headerLabelPrefix); Assert.assertEquals(expectedResults, headerParser.read()); } @@ -88,30 +97,35 @@ public void testDefaultHeaderFormat() throws IOException public void testASCIIHeaderFormat() throws IOException { Iterable
header = ImmutableList.of( - new Header() { + new Header() + { @Override public String key() { return "encoding"; } + @Override public byte[] value() { return "application/json".getBytes(StandardCharsets.US_ASCII); } }, - new Header() { + new Header() + { @Override public String key() { return "kafkapkc"; } + @Override public byte[] value() { return "pkc-bar".getBytes(StandardCharsets.US_ASCII); } - }); + } + ); String headerLabelPrefix = "test.kafka.header."; String timestampLablePrefix = "test.kafka.newts."; @@ -119,14 +133,17 @@ public byte[] value() inputEntity = new KafkaRecordEntity(new ConsumerRecord( "sample", 0, 0, timestamp, null, null, 0, 0, - null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers)); - Map expectedResults = new HashMap() {{ - put("test.kafka.header.encoding","application/json"); - put("test.kafka.header.kafkapkc","pkc-bar"); - }}; + null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers + )); + Map expectedResults = new HashMap() { + { + put("test.kafka.header.encoding", "application/json"); + put("test.kafka.header.kafkapkc", "pkc-bar"); + } + }; KafkaHeaderFormat headerInput = new KafkaStringHeaderFormat("US-ASCII"); - KafkaHeaderReader headerParser = headerInput.createReader(inputEntity.getRecord().headers(),headerLabelPrefix); + KafkaHeaderReader headerParser = headerInput.createReader(inputEntity.getRecord().headers(), headerLabelPrefix); Map row = headerParser.read(); Assert.assertEquals(expectedResults, row); } @@ -135,30 +152,35 @@ public byte[] value() public void testIllegalHeaderCharacter() throws IOException { Iterable
header = ImmutableList.of( - new Header() { + new Header() + { @Override public String key() { return "encoding"; } + @Override public byte[] value() { return "€pplic€tion/json".getBytes(StandardCharsets.US_ASCII); } }, - new Header() { + new Header() + { @Override public String key() { return "kafkapkc"; } + @Override public byte[] value() { return "pkc-bar".getBytes(StandardCharsets.US_ASCII); } - }); + } + ); String headerLabelPrefix = "test.kafka.header."; String timestampLablePrefix = "test.kafka.newts."; @@ -166,14 +188,17 @@ public byte[] value() inputEntity = new KafkaRecordEntity(new ConsumerRecord( "sample", 0, 0, timestamp, null, null, 0, 0, - null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers)); - Map expectedResults = new HashMap() {{ - put("test.kafka.header.encoding","?pplic?tion/json"); - put("test.kafka.header.kafkapkc","pkc-bar"); - }}; + null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers + )); + Map expectedResults = new HashMap() { + { + put("test.kafka.header.encoding", "?pplic?tion/json"); + put("test.kafka.header.kafkapkc", "pkc-bar"); + } + }; KafkaHeaderFormat headerInput = new KafkaStringHeaderFormat("US-ASCII"); - KafkaHeaderReader headerParser = headerInput.createReader(inputEntity.getRecord().headers(),headerLabelPrefix); + KafkaHeaderReader headerParser = headerInput.createReader(inputEntity.getRecord().headers(), headerLabelPrefix); Map row = headerParser.read(); Assert.assertEquals(expectedResults, row); }