Skip to content

Commit

Permalink
Fixing checkstyling
Browse files Browse the repository at this point in the history
  • Loading branch information
lokesh-lingarajan committed Aug 12, 2021
1 parent 5811af4 commit fc43075
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.data.input.kafkainput;

import org.apache.druid.data.input.InputRow;
import java.io.IOException;
import java.util.Map;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,17 @@

import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
import org.apache.druid.data.input.InputEntity;
import org.apache.druid.data.input.InputEntityReader;
import org.apache.druid.data.input.InputFormat;
import org.apache.druid.data.input.InputRowSchema;
import org.apache.druid.data.input.impl.ByteEntity;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.java.util.common.DateTimes;
import org.joda.time.DateTime;

import javax.annotation.Nullable;
import java.io.File;
import java.util.Collections;
import java.util.Objects;

public class KafkaInputFormat implements InputFormat
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

package org.apache.druid.data.input.kafkainput;

import org.apache.druid.java.util.common.logger.Logger;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import org.apache.druid.data.input.InputEntityReader;
Expand All @@ -29,6 +28,7 @@
import org.apache.druid.data.input.MapBasedInputRow;
import org.apache.druid.data.input.kafka.KafkaRecordEntity;
import org.apache.druid.java.util.common.CloseableIterators;
import org.apache.druid.java.util.common.logger.Logger;
import org.apache.druid.java.util.common.parsers.CloseableIterator;
import org.apache.druid.java.util.common.parsers.ParseException;

Expand Down Expand Up @@ -91,11 +91,12 @@ public CloseableIterator<InputRow> read() throws IOException
keyRow.getEvent().entrySet().stream().findFirst().get().getValue()
);
}
} catch (Exception e) {
if (e instanceof IOException){
}
catch (Exception e) {
if (e instanceof IOException) {
log.error(e, "Encountered IOException during key parsing.");
throw (IOException)e;
} else if (e instanceof ParseException) {
throw (IOException) e;
} else if (e instanceof ParseException) {
log.error(e, "Encountered key parsing exception.");
} else {
log.error(e, "Encountered exception during key parsing.");
Expand Down Expand Up @@ -139,11 +140,12 @@ public CloseableIterator<InputRow> read() throws IOException
event
));
}
} catch (Exception e) {
if (e instanceof IOException){
}
catch (Exception e) {
if (e instanceof IOException) {
log.error(e, "Encountered IOException during value parsing.");
throw (IOException)e;
} else if (e instanceof ParseException) {
throw (IOException) e;
} else if (e instanceof ParseException) {
log.error(e, "Encountered value parsing exception.");
} else {
log.error(e, "Encountered exception during value parsing.");
Expand Down Expand Up @@ -178,4 +180,3 @@ public CloseableIterator<InputRowListPlusRawValues> sample() throws IOException
return read().map(row -> InputRowListPlusRawValues.of(row, ((MapBasedInputRow) row).getEvent()));
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -236,15 +236,15 @@ public void testWithOutKey() throws IOException
@Test
public void testTimestampFromHeader() throws IOException
{
Iterable<Header> SAMPLE_HEADERS_WITH_TS = Iterables.unmodifiableIterable(
Iterable<Header> sample_header_with_ts = Iterables.unmodifiableIterable(
Iterables.concat(
SAMPLE_HEADERS,
ImmutableList.of(new Header() {
@Override
public String key()
{
return "headerTs";
}
{
return "headerTs";
}
@Override
public byte[] value()
{
Expand All @@ -268,7 +268,7 @@ public byte[] value()
+ " }\n"
+ "}");

Headers headers = new RecordHeaders(SAMPLE_HEADERS_WITH_TS);
Headers headers = new RecordHeaders(sample_header_with_ts);
inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>(
"sample", 0, 0, timestamp,
null, null, 0, 0,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,38 +31,44 @@

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.HashMap;
import java.util.Map;


public class KafkaStringHeaderFormatTest
{
private KafkaRecordEntity inputEntity;
private long timestamp = DateTimes.of("2021-06-24T00:00:00.000Z").getMillis();
private static final Iterable<Header> SAMPLE_HEADERS = ImmutableList.of(new Header() {
@Override
public String key()
private static final Iterable<Header> SAMPLE_HEADERS = ImmutableList.of(
new Header()
{
return "encoding";
}
@Override
public byte[] value()
@Override
public String key()
{
return "encoding";
}

@Override
public byte[] value()
{
return "application/json".getBytes(StandardCharsets.UTF_8);
}
},
new Header()
{
return "application/json".getBytes(StandardCharsets.UTF_8);
}
},
new Header() {
@Override
public String key()
{
return "kafkapkc";
}

@Override
public byte[] value()
{
return "pkc-bar".getBytes(StandardCharsets.UTF_8);
}
});
}
);
private KafkaRecordEntity inputEntity;
private long timestamp = DateTimes.of("2021-06-24T00:00:00.000Z").getMillis();

@Test
public void testDefaultHeaderFormat() throws IOException
Expand All @@ -73,60 +79,71 @@ public void testDefaultHeaderFormat() throws IOException
inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>(
"sample", 0, 0, timestamp,
null, null, 0, 0,
null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers));
Map<String, Object> expectedResults = new HashMap<String, Object>() {{
put("test.kafka.header.encoding","application/json");
put("test.kafka.header.kafkapkc","pkc-bar");
}};
null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers
));
Map<String, Object> expectedResults = new HashMap<String, Object>() {
{
put("test.kafka.header.encoding", "application/json");
put("test.kafka.header.kafkapkc", "pkc-bar");
}
};

KafkaHeaderFormat headerInput = new KafkaStringHeaderFormat(null);
KafkaHeaderReader headerParser = headerInput.createReader(inputEntity.getRecord().headers(),headerLabelPrefix);
KafkaHeaderReader headerParser = headerInput.createReader(inputEntity.getRecord().headers(), headerLabelPrefix);
Assert.assertEquals(expectedResults, headerParser.read());
}

@Test
public void testASCIIHeaderFormat() throws IOException
{
Iterable<Header> header = ImmutableList.of(
new Header() {
new Header()
{
@Override
public String key()
{
return "encoding";
}

@Override
public byte[] value()
{
return "application/json".getBytes(StandardCharsets.US_ASCII);
}
},
new Header() {
new Header()
{
@Override
public String key()
{
return "kafkapkc";
}

@Override
public byte[] value()
{
return "pkc-bar".getBytes(StandardCharsets.US_ASCII);
}
});
}
);

String headerLabelPrefix = "test.kafka.header.";
String timestampLablePrefix = "test.kafka.newts.";
Headers headers = new RecordHeaders(header);
inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>(
"sample", 0, 0, timestamp,
null, null, 0, 0,
null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers));
Map<String, Object> expectedResults = new HashMap<String, Object>() {{
put("test.kafka.header.encoding","application/json");
put("test.kafka.header.kafkapkc","pkc-bar");
}};
null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers
));
Map<String, Object> expectedResults = new HashMap<String, Object>() {
{
put("test.kafka.header.encoding", "application/json");
put("test.kafka.header.kafkapkc", "pkc-bar");
}
};

KafkaHeaderFormat headerInput = new KafkaStringHeaderFormat("US-ASCII");
KafkaHeaderReader headerParser = headerInput.createReader(inputEntity.getRecord().headers(),headerLabelPrefix);
KafkaHeaderReader headerParser = headerInput.createReader(inputEntity.getRecord().headers(), headerLabelPrefix);
Map<String, Object> row = headerParser.read();
Assert.assertEquals(expectedResults, row);
}
Expand All @@ -135,45 +152,53 @@ public byte[] value()
public void testIllegalHeaderCharacter() throws IOException
{
Iterable<Header> header = ImmutableList.of(
new Header() {
new Header()
{
@Override
public String key()
{
return "encoding";
}

@Override
public byte[] value()
{
return "€pplic€tion/json".getBytes(StandardCharsets.US_ASCII);
}
},
new Header() {
new Header()
{
@Override
public String key()
{
return "kafkapkc";
}

@Override
public byte[] value()
{
return "pkc-bar".getBytes(StandardCharsets.US_ASCII);
}
});
}
);

String headerLabelPrefix = "test.kafka.header.";
String timestampLablePrefix = "test.kafka.newts.";
Headers headers = new RecordHeaders(header);
inputEntity = new KafkaRecordEntity(new ConsumerRecord<byte[], byte[]>(
"sample", 0, 0, timestamp,
null, null, 0, 0,
null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers));
Map<String, Object> expectedResults = new HashMap<String, Object>() {{
put("test.kafka.header.encoding","?pplic?tion/json");
put("test.kafka.header.kafkapkc","pkc-bar");
}};
null, "sampleValue".getBytes(StandardCharsets.UTF_8), headers
));
Map<String, Object> expectedResults = new HashMap<String, Object>() {
{
put("test.kafka.header.encoding", "?pplic?tion/json");
put("test.kafka.header.kafkapkc", "pkc-bar");
}
};

KafkaHeaderFormat headerInput = new KafkaStringHeaderFormat("US-ASCII");
KafkaHeaderReader headerParser = headerInput.createReader(inputEntity.getRecord().headers(),headerLabelPrefix);
KafkaHeaderReader headerParser = headerInput.createReader(inputEntity.getRecord().headers(), headerLabelPrefix);
Map<String, Object> row = headerParser.read();
Assert.assertEquals(expectedResults, row);
}
Expand Down

0 comments on commit fc43075

Please sign in to comment.