Skip to content

Commit

Permalink
Enhance Kinesis consumer (apache#12806)
Browse files Browse the repository at this point in the history
* Enhance Kinesis consumer

* Simplify the handling

* Address comments
  • Loading branch information
Jackie-Jiang authored and gortiz committed Jun 14, 2024
1 parent 13d1723 commit dc8eb66
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 287 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -431,31 +431,4 @@ public void testConsumerTypes() {
// expected
}
}

@Test
public void testKinesisFetchTimeout() {
String streamType = "fakeStream";
String topic = "fakeTopic";
String tableName = "fakeTable_REALTIME";
String consumerFactoryClass = "KinesisConsumerFactory";
String decoderClass = FakeStreamMessageDecoder.class.getName();

Map<String, String> streamConfigMap = new HashMap<>();
streamConfigMap.put(StreamConfigProperties.STREAM_TYPE, streamType);
streamConfigMap.put(
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_TOPIC_NAME), topic);
streamConfigMap.put(StreamConfigProperties.constructStreamProperty(streamType,
StreamConfigProperties.STREAM_CONSUMER_FACTORY_CLASS), consumerFactoryClass);
streamConfigMap.put(
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_DECODER_CLASS),
decoderClass);

String consumerType = "simple";
streamConfigMap.put(
StreamConfigProperties.constructStreamProperty(streamType, StreamConfigProperties.STREAM_CONSUMER_TYPES),
consumerType);
StreamConfig streamConfig = new StreamConfig(tableName, streamConfigMap);

assertEquals(streamConfig.getFetchTimeoutMillis(), StreamConfig.DEFAULT_STREAM_FETCH_TIMEOUT_MILLIS_KINESIS);
}
}
7 changes: 0 additions & 7 deletions pinot-integration-tests/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@

<properties>
<pinot.root>${basedir}/..</pinot.root>
<localstack-utils.version>0.2.23</localstack-utils.version>
<testcontainers.version>1.19.8</testcontainers.version>
</properties>

Expand Down Expand Up @@ -312,12 +311,6 @@
<version>${testcontainers.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>cloud.localstack</groupId>
<artifactId>localstack-utils</artifactId>
<version>${localstack-utils.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand Down
60 changes: 4 additions & 56 deletions pinot-plugins/pinot-stream-ingestion/pinot-kinesis/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -60,77 +60,25 @@
<groupId>software.amazon.awssdk</groupId>
<artifactId>kinesis</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>apache-client</artifactId>
<version>${aws.sdk.version}</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-core</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>sts</artifactId>
</dependency>

<dependency>
<groupId>org.reactivestreams</groupId>
<artifactId>reactive-streams</artifactId>
</dependency>

<!-- TODO: Move LocalStack related class to integration test module -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-buffer</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport</artifactId>
</dependency>

<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-common</artifactId>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<groupId>cloud.localstack</groupId>
<artifactId>localstack-utils</artifactId>
<version>${localstack-utils.version}</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>cloud.localstack</groupId>
<artifactId>localstack-utils</artifactId>
<version>${localstack-utils.version}</version>
</dependency>

</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,7 @@ public class KinesisConfig {
public static final String SESSION_DURATION_SECONDS = "sessionDurationSeconds";
public static final String ASYNC_SESSION_UPDATED_ENABLED = "asyncSessionUpdateEnabled";

// TODO: this is a starting point, until a better default is figured out
public static final String DEFAULT_MAX_RECORDS = "20";
public static final String DEFAULT_MAX_RECORDS = "10000";
public static final String DEFAULT_SHARD_ITERATOR_TYPE = ShardIteratorType.LATEST.toString();
public static final String DEFAULT_IAM_ROLE_BASED_ACCESS_ENABLED = "false";
public static final String DEFAULT_SESSION_DURATION_SECONDS = "900";
Expand Down
Loading

0 comments on commit dc8eb66

Please sign in to comment.